http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java deleted file mode 100644 index 3895506..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContextImpl.java +++ /dev/null @@ -1,621 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform; - -import org.apache.ignite.IgniteException; -import org.apache.ignite.cluster.ClusterMetrics; -import org.apache.ignite.cluster.ClusterNode; -import org.apache.ignite.events.CacheEvent; -import org.apache.ignite.events.CacheQueryExecutedEvent; -import org.apache.ignite.events.CacheQueryReadEvent; -import org.apache.ignite.events.CacheRebalancingEvent; -import org.apache.ignite.events.CheckpointEvent; -import org.apache.ignite.events.DiscoveryEvent; -import org.apache.ignite.events.Event; -import org.apache.ignite.events.EventAdapter; -import org.apache.ignite.events.EventType; -import org.apache.ignite.events.JobEvent; -import org.apache.ignite.events.SwapSpaceEvent; -import org.apache.ignite.events.TaskEvent; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.portable.GridPortableMarshaller; -import org.apache.ignite.internal.portable.PortableMetaDataImpl; -import org.apache.ignite.internal.portable.PortableRawReaderEx; -import org.apache.ignite.internal.portable.PortableRawWriterEx; -import org.apache.ignite.internal.processors.cache.portable.CacheObjectPortableProcessorImpl; -import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; -import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilterImpl; -import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessor; -import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessorImpl; -import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; -import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryFilter; -import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryImpl; -import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQueryRemoteFilter; -import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; -import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilter; -import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilterImpl; -import org.apache.ignite.internal.processors.platform.compute.PlatformAbstractTask; -import org.apache.ignite.internal.processors.platform.compute.PlatformClosureJob; -import org.apache.ignite.internal.processors.platform.compute.PlatformFullJob; -import org.apache.ignite.internal.processors.platform.compute.PlatformJob; -import org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiver; -import org.apache.ignite.internal.processors.platform.datastreamer.PlatformStreamReceiverImpl; -import org.apache.ignite.internal.processors.platform.events.PlatformEventFilterListenerImpl; -import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; -import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; -import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManager; -import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl; -import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.processors.platform.message.PlatformMessageFilter; -import org.apache.ignite.internal.processors.platform.messaging.PlatformMessageFilterImpl; -import org.apache.ignite.internal.processors.platform.utils.PlatformReaderBiClosure; -import org.apache.ignite.internal.processors.platform.utils.PlatformReaderClosure; -import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.T4; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.portable.PortableMetadata; -import org.jetbrains.annotations.Nullable; - -import java.util.Collection; -import java.util.Collections; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -/** - * Implementation of platform context. - */ -public class PlatformContextImpl implements PlatformContext { - /** Supported event types. */ - private static final Set<Integer> evtTyps; - - /** Kernal context. */ - private final GridKernalContext ctx; - - /** Marshaller. */ - private final GridPortableMarshaller marsh; - - /** Memory manager. */ - private final PlatformMemoryManagerImpl mem; - - /** Callback gateway. */ - private final PlatformCallbackGateway gate; - - /** Cache object processor. */ - private final CacheObjectPortableProcessorImpl cacheObjProc; - - /** Node ids that has been sent to native platform. */ - private final Set<UUID> sentNodes = Collections.newSetFromMap(new ConcurrentHashMap<UUID, Boolean>()); - - /** - * Static initializer. - */ - static { - Set<Integer> evtTyps0 = new HashSet<>(); - - addEventTypes(evtTyps0, EventType.EVTS_CACHE); - addEventTypes(evtTyps0, EventType.EVTS_CACHE_QUERY); - addEventTypes(evtTyps0, EventType.EVTS_CACHE_REBALANCE); - addEventTypes(evtTyps0, EventType.EVTS_CHECKPOINT); - addEventTypes(evtTyps0, EventType.EVTS_DISCOVERY_ALL); - addEventTypes(evtTyps0, EventType.EVTS_JOB_EXECUTION); - addEventTypes(evtTyps0, EventType.EVTS_SWAPSPACE); - addEventTypes(evtTyps0, EventType.EVTS_TASK_EXECUTION); - - evtTyps = Collections.unmodifiableSet(evtTyps0); - } - - /** - * Adds all elements to a set. - * @param set Set. - * @param items Items. - */ - private static void addEventTypes(Set<Integer> set, int[] items) { - for (int i : items) - set.add(i); - } - - /** - * Constructor. - * - * @param ctx Kernal context. - * @param gate Callback gateway. - * @param mem Memory manager. - */ - public PlatformContextImpl(GridKernalContext ctx, PlatformCallbackGateway gate, PlatformMemoryManagerImpl mem) { - this.ctx = ctx; - this.gate = gate; - this.mem = mem; - - cacheObjProc = (CacheObjectPortableProcessorImpl)ctx.cacheObjects(); - - marsh = cacheObjProc.marshaller(); - } - - /** {@inheritDoc} */ - @Override public GridKernalContext kernalContext() { - return ctx; - } - - /** {@inheritDoc} */ - @Override public PlatformMemoryManager memory() { - return mem; - } - - /** {@inheritDoc} */ - @Override public PlatformCallbackGateway gateway() { - return gate; - } - - /** {@inheritDoc} */ - @Override public PortableRawReaderEx reader(PlatformMemory mem) { - return reader(mem.input()); - } - - /** {@inheritDoc} */ - @Override public PortableRawReaderEx reader(PlatformInputStream in) { - return marsh.reader(in); - } - - /** {@inheritDoc} */ - @Override public PortableRawWriterEx writer(PlatformMemory mem) { - return writer(mem.output()); - } - - /** {@inheritDoc} */ - @Override public PortableRawWriterEx writer(PlatformOutputStream out) { - return marsh.writer(out); - } - - /** {@inheritDoc} */ - @Override public void addNode(ClusterNode node) { - if (node == null || sentNodes.contains(node.id())) - return; - - // Send node info to the native platform - try (PlatformMemory mem0 = mem.allocate()) { - PlatformOutputStream out = mem0.output(); - - PortableRawWriterEx w = writer(out); - - w.writeUuid(node.id()); - - Map<String, Object> attrs = new HashMap<>(node.attributes()); - - Iterator<Map.Entry<String, Object>> attrIter = attrs.entrySet().iterator(); - - while (attrIter.hasNext()) { - Map.Entry<String, Object> entry = attrIter.next(); - - Object val = entry.getValue(); - - if (val != null && !val.getClass().getName().startsWith("java.lang")) - attrIter.remove(); - } - - w.writeMap(attrs); - w.writeCollection(node.addresses()); - w.writeCollection(node.hostNames()); - w.writeLong(node.order()); - w.writeBoolean(node.isLocal()); - w.writeBoolean(node.isDaemon()); - writeClusterMetrics(w, node.metrics()); - - out.synchronize(); - - gateway().nodeInfo(mem0.pointer()); - } - - sentNodes.add(node.id()); - } - - /** {@inheritDoc} */ - @Override public void writeNode(PortableRawWriterEx writer, ClusterNode node) { - if (node == null) { - writer.writeUuid(null); - - return; - } - - addNode(node); - - writer.writeUuid(node.id()); - } - - /** {@inheritDoc} */ - @Override public void writeNodes(PortableRawWriterEx writer, Collection<ClusterNode> nodes) { - if (nodes == null) { - writer.writeInt(-1); - - return; - } - - writer.writeInt(nodes.size()); - - for (ClusterNode n : nodes) { - addNode(n); - - writer.writeUuid(n.id()); - } - } - - /** {@inheritDoc} */ - @Override public void writeClusterMetrics(PortableRawWriterEx writer, @Nullable ClusterMetrics metrics) { - if (metrics == null) - writer.writeBoolean(false); - else { - writer.writeBoolean(true); - - writer.writeLong(metrics.getLastUpdateTime()); - writer.writeDate(new Date(metrics.getLastUpdateTime())); - writer.writeInt(metrics.getMaximumActiveJobs()); - writer.writeInt(metrics.getCurrentActiveJobs()); - writer.writeFloat(metrics.getAverageActiveJobs()); - writer.writeInt(metrics.getMaximumWaitingJobs()); - - writer.writeInt(metrics.getCurrentWaitingJobs()); - writer.writeFloat(metrics.getAverageWaitingJobs()); - writer.writeInt(metrics.getMaximumRejectedJobs()); - writer.writeInt(metrics.getCurrentRejectedJobs()); - writer.writeFloat(metrics.getAverageRejectedJobs()); - - writer.writeInt(metrics.getTotalRejectedJobs()); - writer.writeInt(metrics.getMaximumCancelledJobs()); - writer.writeInt(metrics.getCurrentCancelledJobs()); - writer.writeFloat(metrics.getAverageCancelledJobs()); - writer.writeInt(metrics.getTotalCancelledJobs()); - - writer.writeInt(metrics.getTotalExecutedJobs()); - writer.writeLong(metrics.getMaximumJobWaitTime()); - writer.writeLong(metrics.getCurrentJobWaitTime()); - writer.writeDouble(metrics.getAverageJobWaitTime()); - writer.writeLong(metrics.getMaximumJobExecuteTime()); - - writer.writeLong(metrics.getCurrentJobExecuteTime()); - writer.writeDouble(metrics.getAverageJobExecuteTime()); - writer.writeInt(metrics.getTotalExecutedTasks()); - writer.writeLong(metrics.getTotalIdleTime()); - writer.writeLong(metrics.getCurrentIdleTime()); - - writer.writeInt(metrics.getTotalCpus()); - writer.writeDouble(metrics.getCurrentCpuLoad()); - writer.writeDouble(metrics.getAverageCpuLoad()); - writer.writeDouble(metrics.getCurrentGcCpuLoad()); - writer.writeLong(metrics.getHeapMemoryInitialized()); - - writer.writeLong(metrics.getHeapMemoryUsed()); - writer.writeLong(metrics.getHeapMemoryCommitted()); - writer.writeLong(metrics.getHeapMemoryMaximum()); - writer.writeLong(metrics.getHeapMemoryTotal()); - writer.writeLong(metrics.getNonHeapMemoryInitialized()); - - writer.writeLong(metrics.getNonHeapMemoryUsed()); - writer.writeLong(metrics.getNonHeapMemoryCommitted()); - writer.writeLong(metrics.getNonHeapMemoryMaximum()); - writer.writeLong(metrics.getNonHeapMemoryTotal()); - writer.writeLong(metrics.getUpTime()); - - writer.writeDate(new Date(metrics.getStartTime())); - writer.writeDate(new Date(metrics.getNodeStartTime())); - writer.writeInt(metrics.getCurrentThreadCount()); - writer.writeInt(metrics.getMaximumThreadCount()); - writer.writeLong(metrics.getTotalStartedThreadCount()); - - writer.writeInt(metrics.getCurrentDaemonThreadCount()); - writer.writeLong(metrics.getLastDataVersion()); - writer.writeInt(metrics.getSentMessagesCount()); - writer.writeLong(metrics.getSentBytesCount()); - writer.writeInt(metrics.getReceivedMessagesCount()); - - writer.writeLong(metrics.getReceivedBytesCount()); - writer.writeInt(metrics.getOutboundMessagesQueueSize()); - - writer.writeInt(metrics.getTotalNodes()); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings("ConstantConditions") - @Override public void processMetadata(PortableRawReaderEx reader) { - Collection<T4<Integer, String, String, Map<String, Integer>>> metas = PlatformUtils.readCollection(reader, - new PlatformReaderClosure<T4<Integer, String, String, Map<String, Integer>>>() { - @Override public T4<Integer, String, String, Map<String, Integer>> read(PortableRawReaderEx reader) { - int typeId = reader.readInt(); - String typeName = reader.readString(); - String affKey = reader.readString(); - - Map<String, Integer> fields = PlatformUtils.readMap(reader, - new PlatformReaderBiClosure<String, Integer>() { - @Override public IgniteBiTuple<String, Integer> read(PortableRawReaderEx reader) { - return F.t(reader.readString(), reader.readInt()); - } - }); - - return new T4<>(typeId, typeName, affKey, fields); - } - } - ); - - for (T4<Integer, String, String, Map<String, Integer>> meta : metas) - cacheObjProc.updateMetaData(meta.get1(), meta.get2(), meta.get3(), meta.get4()); - } - - /** {@inheritDoc} */ - @Override public void writeMetadata(PortableRawWriterEx writer, int typeId) { - writeMetadata0(writer, typeId, cacheObjProc.metadata(typeId)); - } - - /** {@inheritDoc} */ - @Override public void writeAllMetadata(PortableRawWriterEx writer) { - Collection<PortableMetadata> metas = cacheObjProc.metadata(); - - writer.writeInt(metas.size()); - - for (org.apache.ignite.portable.PortableMetadata m : metas) - writeMetadata0(writer, cacheObjProc.typeId(m.typeName()), m); - } - - /** - * Write portable metadata. - * - * @param writer Writer. - * @param typeId Type id. - * @param meta Metadata. - */ - private void writeMetadata0(PortableRawWriterEx writer, int typeId, PortableMetadata meta) { - if (meta == null) - writer.writeBoolean(false); - else { - writer.writeBoolean(true); - - Map<String, String> metaFields = ((PortableMetaDataImpl)meta).fields0(); - - Map<String, Integer> fields = U.newHashMap(metaFields.size()); - - for (Map.Entry<String, String> metaField : metaFields.entrySet()) - fields.put(metaField.getKey(), CacheObjectPortableProcessorImpl.fieldTypeId(metaField.getValue())); - - writer.writeInt(typeId); - writer.writeString(meta.typeName()); - writer.writeString(meta.affinityKeyFieldName()); - writer.writeMap(fields); - } - } - - /** {@inheritDoc} */ - @Override public PlatformContinuousQuery createContinuousQuery(long ptr, boolean hasFilter, - @Nullable Object filter) { - return new PlatformContinuousQueryImpl(this, ptr, hasFilter, filter); - } - - /** {@inheritDoc} */ - @Override public PlatformContinuousQueryFilter createContinuousQueryFilter(Object filter) { - return new PlatformContinuousQueryRemoteFilter(filter); - } - - /** {@inheritDoc} */ - @Override public PlatformMessageFilter createRemoteMessageFilter(Object filter, long ptr) { - return new PlatformMessageFilterImpl(filter, ptr, this); - } - - /** {@inheritDoc} */ - @Override public boolean isEventTypeSupported(int evtTyp) { - return evtTyps.contains(evtTyp); - } - - /** {@inheritDoc} */ - @Override public void writeEvent(PortableRawWriterEx writer, Event evt) { - assert writer != null; - - if (evt == null) - { - writer.writeInt(-1); - - return; - } - - EventAdapter evt0 = (EventAdapter)evt; - - if (evt0 instanceof CacheEvent) { - writer.writeInt(2); - writeCommonEventData(writer, evt0); - - CacheEvent event0 = (CacheEvent)evt0; - - writer.writeString(event0.cacheName()); - writer.writeInt(event0.partition()); - writer.writeBoolean(event0.isNear()); - writeNode(writer, event0.eventNode()); - writer.writeObject(event0.key()); - PlatformUtils.writeIgniteUuid(writer, event0.xid()); - writer.writeObject(event0.lockId()); - writer.writeObject(event0.newValue()); - writer.writeObject(event0.oldValue()); - writer.writeBoolean(event0.hasOldValue()); - writer.writeBoolean(event0.hasNewValue()); - writer.writeUuid(event0.subjectId()); - writer.writeString(event0.closureClassName()); - writer.writeString(event0.taskName()); - } - else if (evt0 instanceof CacheQueryExecutedEvent) { - writer.writeInt(3); - writeCommonEventData(writer, evt0); - - CacheQueryExecutedEvent event0 = (CacheQueryExecutedEvent)evt0; - - writer.writeString(event0.queryType()); - writer.writeString(event0.cacheName()); - writer.writeString(event0.className()); - writer.writeString(event0.clause()); - writer.writeUuid(event0.subjectId()); - writer.writeString(event0.taskName()); - } - else if (evt0 instanceof CacheQueryReadEvent) { - writer.writeInt(4); - writeCommonEventData(writer, evt0); - - CacheQueryReadEvent event0 = (CacheQueryReadEvent)evt0; - - writer.writeString(event0.queryType()); - writer.writeString(event0.cacheName()); - writer.writeString(event0.className()); - writer.writeString(event0.clause()); - writer.writeUuid(event0.subjectId()); - writer.writeString(event0.taskName()); - writer.writeObject(event0.key()); - writer.writeObject(event0.value()); - writer.writeObject(event0.oldValue()); - writer.writeObject(event0.row()); - } - else if (evt0 instanceof CacheRebalancingEvent) { - writer.writeInt(5); - writeCommonEventData(writer, evt0); - - CacheRebalancingEvent event0 = (CacheRebalancingEvent)evt0; - - writer.writeString(event0.cacheName()); - writer.writeInt(event0.partition()); - writeNode(writer, event0.discoveryNode()); - writer.writeInt(event0.discoveryEventType()); - writer.writeString(event0.discoveryEventName()); - writer.writeLong(event0.discoveryTimestamp()); - } - else if (evt0 instanceof CheckpointEvent) { - writer.writeInt(6); - writeCommonEventData(writer, evt0); - - CheckpointEvent event0 = (CheckpointEvent)evt0; - - writer.writeString(event0.key()); - } - else if (evt0 instanceof DiscoveryEvent) { - writer.writeInt(7); - writeCommonEventData(writer, evt0); - - DiscoveryEvent event0 = (DiscoveryEvent)evt0; - - writeNode(writer, event0.eventNode()); - writer.writeLong(event0.topologyVersion()); - - writeNodes(writer, event0.topologyNodes()); - } - else if (evt0 instanceof JobEvent) { - writer.writeInt(8); - writeCommonEventData(writer, evt0); - - JobEvent event0 = (JobEvent)evt0; - - writer.writeString(event0.taskName()); - writer.writeString(event0.taskClassName()); - PlatformUtils.writeIgniteUuid(writer, event0.taskSessionId()); - PlatformUtils.writeIgniteUuid(writer, event0.jobId()); - writeNode(writer, event0.taskNode()); - writer.writeUuid(event0.taskSubjectId()); - } - else if (evt0 instanceof SwapSpaceEvent) { - writer.writeInt(9); - writeCommonEventData(writer, evt0); - - SwapSpaceEvent event0 = (SwapSpaceEvent)evt0; - - writer.writeString(event0.space()); - } - else if (evt0 instanceof TaskEvent) { - writer.writeInt(10); - writeCommonEventData(writer, evt0); - - TaskEvent event0 = (TaskEvent)evt0; - - writer.writeString(event0.taskName()); - writer.writeString(event0.taskClassName()); - PlatformUtils.writeIgniteUuid(writer, event0.taskSessionId()); - writer.writeBoolean(event0.internal()); - writer.writeUuid(event0.subjectId()); - } - else - throw new IgniteException("Unsupported event: " + evt); - } - - /** - * Write common event data. - * - * @param writer Writer. - * @param evt Event. - */ - private void writeCommonEventData(PortableRawWriterEx writer, EventAdapter evt) { - PlatformUtils.writeIgniteUuid(writer, evt.id()); - writer.writeLong(evt.localOrder()); - writeNode(writer, evt.node()); - writer.writeString(evt.message()); - writer.writeInt(evt.type()); - writer.writeString(evt.name()); - writer.writeDate(new Date(evt.timestamp())); - } - - /** {@inheritDoc} */ - @Override public PlatformEventFilterListener createLocalEventFilter(long hnd) { - return new PlatformEventFilterListenerImpl(hnd, this); - } - - /** {@inheritDoc} */ - @Override public PlatformEventFilterListener createRemoteEventFilter(Object pred, int... types) { - return new PlatformEventFilterListenerImpl(pred, types); - } - - /** {@inheritDoc} */ - @Override public PlatformNativeException createNativeException(Object cause) { - return new PlatformNativeException(cause); - } - - /** {@inheritDoc} */ - @Override public PlatformJob createJob(Object task, long ptr, @Nullable Object job) { - return new PlatformFullJob(this, (PlatformAbstractTask)task, ptr, job); - } - - /** {@inheritDoc} */ - @Override public PlatformJob createClosureJob(Object task, long ptr, Object job) { - return new PlatformClosureJob((PlatformAbstractTask)task, ptr, job); - } - - /** {@inheritDoc} */ - @Override public PlatformCacheEntryProcessor createCacheEntryProcessor(Object proc, long ptr) { - return new PlatformCacheEntryProcessorImpl(proc, ptr); - } - - /** {@inheritDoc} */ - @Override public PlatformCacheEntryFilter createCacheEntryFilter(Object filter, long ptr) { - return new PlatformCacheEntryFilterImpl(filter, ptr, this); - } - - /** {@inheritDoc} */ - @Override public PlatformStreamReceiver createStreamReceiver(Object rcv, long ptr, boolean keepPortable) { - return new PlatformStreamReceiverImpl(rcv, ptr, keepPortable, this); - } - - /** {@inheritDoc} */ - @Override public PlatformClusterNodeFilter createClusterNodeFilter(Object filter) { - return new PlatformClusterNodeFilterImpl(filter, this); - } -}
http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java deleted file mode 100644 index e642b2d..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformIgnition.java +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform; - -import java.net.URL; -import java.security.AccessController; -import java.security.PrivilegedAction; -import java.util.HashMap; -import java.util.ServiceLoader; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteException; -import org.apache.ignite.Ignition; -import org.apache.ignite.configuration.IgniteConfiguration; -import org.apache.ignite.internal.IgnitionEx; -import org.apache.ignite.internal.processors.resource.GridSpringResourceContext; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.apache.ignite.lang.IgniteBiTuple; -import org.jetbrains.annotations.Nullable; - -/** - * Entry point for platform nodes. - */ -@SuppressWarnings("UnusedDeclaration") -public class PlatformIgnition { - /** Map with active instances. */ - private static final HashMap<String, PlatformProcessor> instances = new HashMap<>(); - - /** - * Start Ignite node in platform mode. - * - * @param springCfgPath Spring configuration path. - * @param gridName Grid name. - * @param factoryId Factory ID. - * @param envPtr Environment pointer. - * @param dataPtr Optional pointer to additional data required for startup. - * @return Ignite instance. - */ - public static synchronized PlatformProcessor start(@Nullable String springCfgPath, @Nullable String gridName, - int factoryId, long envPtr, long dataPtr) { - if (envPtr <= 0) - throw new IgniteException("Environment pointer must be positive."); - - ClassLoader oldClsLdr = Thread.currentThread().getContextClassLoader(); - - Thread.currentThread().setContextClassLoader(PlatformProcessor.class.getClassLoader()); - - try { - IgniteConfiguration cfg = configuration(springCfgPath); - - if (gridName != null) - cfg.setGridName(gridName); - else - gridName = cfg.getGridName(); - - PlatformBootstrap bootstrap = bootstrap(factoryId); - - PlatformProcessor proc = bootstrap.start(cfg, envPtr, dataPtr); - - PlatformProcessor old = instances.put(gridName, proc); - - assert old == null; - - return proc; - } - finally { - Thread.currentThread().setContextClassLoader(oldClsLdr); - } - } - - /** - * Get instance by environment pointer. - * - * @param gridName Grid name. - * @return Instance or {@code null} if it doesn't exist (never started or stopped). - */ - @Nullable public static synchronized PlatformProcessor instance(@Nullable String gridName) { - return instances.get(gridName); - } - - /** - * Get environment pointer of the given instance. - * - * @param gridName Grid name. - * @return Environment pointer or {@code 0} in case grid with such name doesn't exist. - */ - public static synchronized long environmentPointer(@Nullable String gridName) { - PlatformProcessor proc = instance(gridName); - - return proc != null ? proc.environmentPointer() : 0; - } - - /** - * Stop single instance. - * - * @param gridName Grid name, - * @param cancel Cancel flag. - * @return {@code True} if instance was found and stopped. - */ - public static synchronized boolean stop(@Nullable String gridName, boolean cancel) { - if (Ignition.stop(gridName, cancel)) { - PlatformProcessor old = instances.remove(gridName); - - assert old != null; - - return true; - } - else - return false; - } - - /** - * Stop all instances. - * - * @param cancel Cancel flag. - */ - public static synchronized void stopAll(boolean cancel) { - for (PlatformProcessor proc : instances.values()) - Ignition.stop(proc.ignite().name(), cancel); - - instances.clear(); - } - - /** - * Create configuration. - * - * @param springCfgPath Path to Spring XML. - * @return Configuration. - */ - private static IgniteConfiguration configuration(@Nullable String springCfgPath) { - try { - URL url = springCfgPath == null ? U.resolveIgniteUrl(IgnitionEx.DFLT_CFG) : - U.resolveSpringUrl(springCfgPath); - - IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> t = IgnitionEx.loadConfiguration(url); - - return t.get1(); - } - catch (IgniteCheckedException e) { - throw new IgniteException("Failed to instantiate configuration from Spring XML: " + springCfgPath, e); - } - } - - /** - * Create bootstrap for the given factory ID. - * - * @param factoryId Factory ID. - * @return Bootstrap. - */ - private static PlatformBootstrap bootstrap(final int factoryId) { - PlatformBootstrapFactory factory = AccessController.doPrivileged( - new PrivilegedAction<PlatformBootstrapFactory>() { - @Override public PlatformBootstrapFactory run() { - for (PlatformBootstrapFactory factory : ServiceLoader.load(PlatformBootstrapFactory.class)) { - if (factory.id() == factoryId) - return factory; - } - - return null; - } - }); - - if (factory == null) - throw new IgniteException("Interop factory is not found (did you put into the classpath?): " + factoryId); - - return factory.create(); - } - - /** - * Private constructor. - */ - private PlatformIgnition() { - // No-op. - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java deleted file mode 100644 index 40b1334..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java +++ /dev/null @@ -1,360 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform; - -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.IgniteDataStreamer; -import org.apache.ignite.IgniteException; -import org.apache.ignite.IgniteLogger; -import org.apache.ignite.configuration.PlatformConfiguration; -import org.apache.ignite.internal.GridKernalContext; -import org.apache.ignite.internal.IgniteComputeImpl; -import org.apache.ignite.internal.cluster.ClusterGroupAdapter; -import org.apache.ignite.internal.portable.PortableRawWriterEx; -import org.apache.ignite.internal.processors.GridProcessorAdapter; -import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; -import org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl; -import org.apache.ignite.internal.processors.platform.cache.PlatformCache; -import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinity; -import org.apache.ignite.internal.processors.platform.cache.store.PlatformCacheStore; -import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterGroup; -import org.apache.ignite.internal.processors.platform.compute.PlatformCompute; -import org.apache.ignite.internal.processors.platform.datastreamer.PlatformDataStreamer; -import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore; -import org.apache.ignite.internal.processors.platform.events.PlatformEvents; -import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; -import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.processors.platform.messaging.PlatformMessaging; -import org.apache.ignite.internal.processors.platform.services.PlatformServices; -import org.apache.ignite.internal.processors.platform.transactions.PlatformTransactions; -import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.internal.util.typedef.F; -import org.apache.ignite.internal.util.typedef.internal.U; -import org.jetbrains.annotations.Nullable; - -import java.util.Collection; -import java.util.Collections; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -/** - * GridGain platform processor. - */ -public class PlatformProcessorImpl extends GridProcessorAdapter implements PlatformProcessor { - /** Start latch. */ - private final CountDownLatch startLatch = new CountDownLatch(1); - - /** Stores pending initialization. */ - private final Collection<StoreInfo> pendingStores = - Collections.newSetFromMap(new ConcurrentHashMap<StoreInfo, Boolean>()); - - /** Started stores. */ - private final Collection<PlatformCacheStore> stores = - Collections.newSetFromMap(new ConcurrentHashMap<PlatformCacheStore, Boolean>()); - - /** Lock for store lifecycle operations. */ - private final ReadWriteLock storeLock = new ReentrantReadWriteLock(); - - /** Logger. */ - private final IgniteLogger log; - - /** Context. */ - private final PlatformContext platformCtx; - - /** Interop configuration. */ - private final PlatformConfigurationEx interopCfg; - - /** Whether processor is started. */ - private boolean started; - - /** Whether processor if stopped (or stopping). */ - private boolean stopped; - - /** - * Constructor. - * - * @param ctx Kernal context. - */ - public PlatformProcessorImpl(GridKernalContext ctx) { - super(ctx); - - log = ctx.log(PlatformProcessorImpl.class); - - PlatformConfiguration interopCfg0 = ctx.config().getPlatformConfiguration(); - - assert interopCfg0 != null : "Must be checked earlier during component creation."; - - if (!(interopCfg0 instanceof PlatformConfigurationEx)) - throw new IgniteException("Unsupported platform configuration: " + interopCfg0.getClass().getName()); - - interopCfg = (PlatformConfigurationEx)interopCfg0; - - if (!F.isEmpty(interopCfg.warnings())) { - for (String w : interopCfg.warnings()) - U.warn(log, w); - } - - platformCtx = new PlatformContextImpl(ctx, interopCfg.gate(), interopCfg.memory()); - } - - /** {@inheritDoc} */ - @Override public void start() throws IgniteCheckedException { - try (PlatformMemory mem = platformCtx.memory().allocate()) { - PlatformOutputStream out = mem.output(); - - PortableRawWriterEx writer = platformCtx.writer(out); - - writer.writeString(ctx.gridName()); - - out.synchronize(); - - platformCtx.gateway().onStart(this, mem.pointer()); - } - - // At this moment all necessary native libraries must be loaded, so we can process with store creation. - storeLock.writeLock().lock(); - - try { - for (StoreInfo store : pendingStores) - registerStore0(store.store, store.convertPortable); - - pendingStores.clear(); - - started = true; - } - finally { - storeLock.writeLock().unlock(); - } - - // Add Interop node attributes. - ctx.addNodeAttribute(PlatformUtils.ATTR_PLATFORM, interopCfg.platform()); - } - - /** {@inheritDoc} */ - @Override public void onKernalStop(boolean cancel) { - startLatch.countDown(); - } - - /** {@inheritDoc} */ - @Override public void stop(boolean cancel) throws IgniteCheckedException { - if (platformCtx != null) { - // Destroy cache stores. - storeLock.writeLock().lock(); - - try { - for (PlatformCacheStore store : stores) { - if (store != null) { - if (store instanceof PlatformDotNetCacheStore) { - PlatformDotNetCacheStore store0 = (PlatformDotNetCacheStore)store; - - try { - store0.destroy(platformCtx.kernalContext()); - } - catch (Exception e) { - U.error(log, "Failed to destroy .Net cache store [store=" + store0 + - ", err=" + e.getMessage() + ']'); - } - } - else - assert false : "Invalid interop cache store type: " + store; - } - } - } - finally { - stopped = true; - - storeLock.writeLock().unlock(); - } - - platformCtx.gateway().onStop(); - } - } - - /** {@inheritDoc} */ - @Override public Ignite ignite() { - return ctx.grid(); - } - - /** {@inheritDoc} */ - @Override public long environmentPointer() { - return platformCtx.gateway().environmentPointer(); - } - - /** {@inheritDoc} */ - public void releaseStart() { - startLatch.countDown(); - } - - /** {@inheritDoc} */ - public void awaitStart() throws IgniteCheckedException { - U.await(startLatch); - } - - /** {@inheritDoc} */ - @Override public PlatformContext context() { - return platformCtx; - } - - /** {@inheritDoc} */ - @Override public PlatformTarget cache(@Nullable String name) throws IgniteCheckedException { - IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().cache(name); - - if (cache == null) - throw new IllegalArgumentException("Cache doesn't exist: " + name); - - return new PlatformCache(platformCtx, cache.keepPortable(), false); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget createCache(@Nullable String name) throws IgniteCheckedException { - IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().createCache(name); - - assert cache != null; - - return new PlatformCache(platformCtx, cache.keepPortable(), false); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget getOrCreateCache(@Nullable String name) throws IgniteCheckedException { - IgniteCacheProxy cache = (IgniteCacheProxy)ctx.grid().getOrCreateCache(name); - - assert cache != null; - - return new PlatformCache(platformCtx, cache.keepPortable(), false); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget affinity(@Nullable String name) throws IgniteCheckedException { - return new PlatformAffinity(platformCtx, ctx, name); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget dataStreamer(@Nullable String cacheName, boolean keepPortable) - throws IgniteCheckedException { - IgniteDataStreamer ldr = ctx.dataStream().dataStreamer(cacheName); - - return new PlatformDataStreamer(platformCtx, cacheName, (DataStreamerImpl)ldr, keepPortable); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget transactions() { - return new PlatformTransactions(platformCtx); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget projection() throws IgniteCheckedException { - return new PlatformClusterGroup(platformCtx, ctx.grid().cluster()); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget compute(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; - - assert grp0.projection() instanceof ClusterGroupAdapter; // Safety for very complex ClusterGroup hierarchy. - - return new PlatformCompute(platformCtx, (IgniteComputeImpl)((ClusterGroupAdapter)grp0.projection()).compute()); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget message(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; - - return new PlatformMessaging(platformCtx, grp0.projection().ignite().message(grp0.projection())); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget events(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; - - return new PlatformEvents(platformCtx, grp0.projection().ignite().events(grp0.projection())); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget services(PlatformTarget grp) { - PlatformClusterGroup grp0 = (PlatformClusterGroup)grp; - - return new PlatformServices(platformCtx, grp0.projection().ignite().services(grp0.projection()), false); - } - - /** {@inheritDoc} */ - @Override public PlatformTarget extensions() { - return null; - } - - /** {@inheritDoc} */ - @Override public void registerStore(PlatformCacheStore store, boolean convertPortable) - throws IgniteCheckedException { - storeLock.readLock().lock(); - - try { - if (stopped) - throw new IgniteCheckedException("Failed to initialize interop store becuase node is stopping: " + - store); - - if (started) - registerStore0(store, convertPortable); - else - pendingStores.add(new StoreInfo(store, convertPortable)); - } - finally { - storeLock.readLock().unlock(); - } - } - - /** - * Internal store initialization routine. - * - * @param store Store. - * @param convertPortable Convert portable flag. - * @throws IgniteCheckedException If failed. - */ - private void registerStore0(PlatformCacheStore store, boolean convertPortable) throws IgniteCheckedException { - if (store instanceof PlatformDotNetCacheStore) { - PlatformDotNetCacheStore store0 = (PlatformDotNetCacheStore)store; - - store0.initialize(ctx, convertPortable); - } - else - throw new IgniteCheckedException("Unsupported interop store: " + store); - } - - /** - * Store and manager pair. - */ - private static class StoreInfo { - /** Store. */ - private final PlatformCacheStore store; - - /** Convert portable flag. */ - private final boolean convertPortable; - - /** - * Constructor. - * - * @param store Store. - * @param convertPortable Convert portable flag. - */ - private StoreInfo(PlatformCacheStore store, boolean convertPortable) { - this.store = store; - this.convertPortable = convertPortable; - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java deleted file mode 100644 index ecdfc2c..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCache.java +++ /dev/null @@ -1,1090 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform.cache; - -import org.apache.ignite.IgniteCache; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheEntryProcessor; -import org.apache.ignite.cache.CacheMetrics; -import org.apache.ignite.cache.CachePartialUpdateException; -import org.apache.ignite.cache.CachePeekMode; -import org.apache.ignite.cache.query.Query; -import org.apache.ignite.cache.query.ScanQuery; -import org.apache.ignite.cache.query.SqlFieldsQuery; -import org.apache.ignite.cache.query.SqlQuery; -import org.apache.ignite.cache.query.TextQuery; -import org.apache.ignite.internal.portable.PortableRawReaderEx; -import org.apache.ignite.internal.portable.PortableRawWriterEx; -import org.apache.ignite.internal.processors.cache.CacheOperationContext; -import org.apache.ignite.internal.processors.cache.CachePartialUpdateCheckedException; -import org.apache.ignite.internal.processors.cache.IgniteCacheProxy; -import org.apache.ignite.internal.processors.cache.query.QueryCursorEx; -import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; -import org.apache.ignite.internal.processors.platform.PlatformContext; -import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; -import org.apache.ignite.internal.processors.platform.cache.query.PlatformFieldsQueryCursor; -import org.apache.ignite.internal.processors.platform.cache.query.PlatformQueryCursor; -import org.apache.ignite.internal.processors.platform.PlatformNativeException; -import org.apache.ignite.internal.processors.platform.utils.PlatformFutureUtils; -import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.internal.util.GridConcurrentFactory; -import org.apache.ignite.internal.util.typedef.C1; -import org.apache.ignite.lang.IgniteFuture; -import org.jetbrains.annotations.Nullable; - -import javax.cache.Cache; -import javax.cache.expiry.Duration; -import javax.cache.expiry.ExpiryPolicy; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.EntryProcessorResult; -import java.util.Iterator; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; - -/** - * Native cache wrapper implementation. - */ -@SuppressWarnings({"unchecked", "UnusedDeclaration", "TryFinallyCanBeTryWithResources"}) -public class PlatformCache extends PlatformAbstractTarget { - /** */ - public static final int OP_CLEAR = 1; - - /** */ - public static final int OP_CLEAR_ALL = 2; - - /** */ - public static final int OP_CONTAINS_KEY = 3; - - /** */ - public static final int OP_CONTAINS_KEYS = 4; - - /** */ - public static final int OP_GET = 5; - - /** */ - public static final int OP_GET_ALL = 6; - - /** */ - public static final int OP_GET_AND_PUT = 7; - - /** */ - public static final int OP_GET_AND_PUT_IF_ABSENT = 8; - - /** */ - public static final int OP_GET_AND_REMOVE = 9; - - /** */ - public static final int OP_GET_AND_REPLACE = 10; - - /** */ - public static final int OP_GET_NAME = 11; - - /** */ - public static final int OP_INVOKE = 12; - - /** */ - public static final int OP_INVOKE_ALL = 13; - - /** */ - public static final int OP_IS_LOCAL_LOCKED = 14; - - /** */ - public static final int OP_LOAD_CACHE = 15; - - /** */ - public static final int OP_LOC_EVICT = 16; - - /** */ - public static final int OP_LOC_LOAD_CACHE = 17; - - /** */ - public static final int OP_LOC_PROMOTE = 18; - - /** */ - public static final int OP_LOCAL_CLEAR = 20; - - /** */ - public static final int OP_LOCAL_CLEAR_ALL = 21; - - /** */ - public static final int OP_LOCK = 22; - - /** */ - public static final int OP_LOCK_ALL = 23; - - /** */ - public static final int OP_METRICS = 24; - - /** */ - private static final int OP_PEEK = 25; - - /** */ - private static final int OP_PUT = 26; - - /** */ - private static final int OP_PUT_ALL = 27; - - /** */ - public static final int OP_PUT_IF_ABSENT = 28; - - /** */ - public static final int OP_QRY_CONTINUOUS = 29; - - /** */ - public static final int OP_QRY_SCAN = 30; - - /** */ - public static final int OP_QRY_SQL = 31; - - /** */ - public static final int OP_QRY_SQL_FIELDS = 32; - - /** */ - public static final int OP_QRY_TXT = 33; - - /** */ - public static final int OP_REMOVE_ALL = 34; - - /** */ - public static final int OP_REMOVE_BOOL = 35; - - /** */ - public static final int OP_REMOVE_OBJ = 36; - - /** */ - public static final int OP_REPLACE_2 = 37; - - /** */ - public static final int OP_REPLACE_3 = 38; - - /** Underlying JCache. */ - private final IgniteCacheProxy cache; - - /** Whether this cache is created with "keepPortable" flag on the other side. */ - private final boolean keepPortable; - - /** */ - private static final GetAllWriter WRITER_GET_ALL = new GetAllWriter(); - - /** */ - private static final EntryProcessorInvokeWriter WRITER_INVOKE = new EntryProcessorInvokeWriter(); - - /** */ - private static final EntryProcessorInvokeAllWriter WRITER_INVOKE_ALL = new EntryProcessorInvokeAllWriter(); - - /** Map with currently active locks. */ - private final ConcurrentMap<Long, Lock> lockMap = GridConcurrentFactory.newMap(); - - /** Lock ID sequence. */ - private static final AtomicLong LOCK_ID_GEN = new AtomicLong(); - - /** - * Constructor. - * - * @param platformCtx Context. - * @param cache Underlying cache. - * @param keepPortable Keep portable flag. - */ - public PlatformCache(PlatformContext platformCtx, IgniteCache cache, boolean keepPortable) { - super(platformCtx); - - this.cache = (IgniteCacheProxy)cache; - this.keepPortable = keepPortable; - } - - /** - * Gets cache with "skip-store" flag set. - * - * @return Cache with "skip-store" flag set. - */ - public PlatformCache withSkipStore() { - if (cache.delegate().skipStore()) - return this; - - return new PlatformCache(platformCtx, cache.withSkipStore(), keepPortable); - } - - /** - * Gets cache with "keep portable" flag. - * - * @return Cache with "keep portable" flag set. - */ - public PlatformCache withKeepPortable() { - if (keepPortable) - return this; - - return new PlatformCache(platformCtx, cache.withSkipStore(), true); - } - - /** - * Gets cache with provided expiry policy. - * - * @param create Create. - * @param update Update. - * @param access Access. - * @return Cache. - */ - public PlatformCache withExpiryPolicy(final long create, final long update, final long access) { - IgniteCache cache0 = cache.withExpiryPolicy(new InteropExpiryPolicy(create, update, access)); - - return new PlatformCache(platformCtx, cache0, keepPortable); - } - - /** - * Gets cache with asynchronous mode enabled. - * - * @return Cache with asynchronous mode enabled. - */ - public PlatformCache withAsync() { - if (cache.isAsync()) - return this; - - return new PlatformCache(platformCtx, (IgniteCache)cache.withAsync(), keepPortable); - } - - /** - * Gets cache with no-retries mode enabled. - * - * @return Cache with no-retries mode enabled. - */ - public PlatformCache withNoRetries() { - CacheOperationContext opCtx = cache.operationContext(); - - if (opCtx != null && opCtx.noRetries()) - return this; - - return new PlatformCache(platformCtx, cache.withNoRetries(), keepPortable); - } - - /** {@inheritDoc} */ - @Override protected long processInStreamOutLong(int type, PortableRawReaderEx reader) throws IgniteCheckedException { - switch (type) { - case OP_PUT: - cache.put(reader.readObjectDetached(), reader.readObjectDetached()); - - return TRUE; - - case OP_REMOVE_BOOL: - return cache.remove(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - - case OP_REMOVE_ALL: - cache.removeAll(PlatformUtils.readSet(reader)); - - return TRUE; - - case OP_PUT_ALL: - cache.putAll(PlatformUtils.readMap(reader)); - - return TRUE; - - case OP_LOC_EVICT: - cache.localEvict(PlatformUtils.readCollection(reader)); - - return TRUE; - - case OP_CONTAINS_KEY: - return cache.containsKey(reader.readObjectDetached()) ? TRUE : FALSE; - - case OP_CONTAINS_KEYS: - return cache.containsKeys(PlatformUtils.readSet(reader)) ? TRUE : FALSE; - - case OP_LOC_PROMOTE: { - cache.localPromote(PlatformUtils.readSet(reader)); - - break; - } - - case OP_REPLACE_3: - return cache.replace(reader.readObjectDetached(), reader.readObjectDetached(), - reader.readObjectDetached()) ? TRUE : FALSE; - - case OP_LOC_LOAD_CACHE: - loadCache0(reader, true); - - break; - - case OP_LOAD_CACHE: - loadCache0(reader, false); - - break; - - case OP_CLEAR: - cache.clear(reader.readObjectDetached()); - - break; - - case OP_CLEAR_ALL: - cache.clearAll(PlatformUtils.readSet(reader)); - - break; - - case OP_LOCAL_CLEAR: - cache.localClear(reader.readObjectDetached()); - - break; - - case OP_LOCAL_CLEAR_ALL: - cache.localClearAll(PlatformUtils.readSet(reader)); - - break; - - case OP_PUT_IF_ABSENT: { - return cache.putIfAbsent(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - } - - case OP_REPLACE_2: { - return cache.replace(reader.readObjectDetached(), reader.readObjectDetached()) ? TRUE : FALSE; - } - - case OP_REMOVE_OBJ: { - return cache.remove(reader.readObjectDetached()) ? TRUE : FALSE; - } - - case OP_IS_LOCAL_LOCKED: - return cache.isLocalLocked(reader.readObjectDetached(), reader.readBoolean()) ? TRUE : FALSE; - - default: - return super.processInStreamOutLong(type, reader); - } - - return TRUE; - } - - /** - * Loads cache via localLoadCache or loadCache. - */ - private void loadCache0(PortableRawReaderEx reader, boolean loc) throws IgniteCheckedException { - PlatformCacheEntryFilter filter = null; - - Object pred = reader.readObjectDetached(); - - if (pred != null) - filter = platformCtx.createCacheEntryFilter(pred, reader.readLong()); - - Object[] args = reader.readObjectArray(); - - if (loc) - cache.localLoadCache(filter, args); - else - cache.loadCache(filter, args); - } - - /** {@inheritDoc} */ - @Override protected Object processInStreamOutObject(int type, PortableRawReaderEx reader) - throws IgniteCheckedException { - switch (type) { - case OP_QRY_SQL: - return runQuery(reader, readSqlQuery(reader)); - - case OP_QRY_SQL_FIELDS: - return runFieldsQuery(reader, readFieldsQuery(reader)); - - case OP_QRY_TXT: - return runQuery(reader, readTextQuery(reader)); - - case OP_QRY_SCAN: - return runQuery(reader, readScanQuery(reader)); - - case OP_QRY_CONTINUOUS: { - long ptr = reader.readLong(); - boolean loc = reader.readBoolean(); - boolean hasFilter = reader.readBoolean(); - Object filter = reader.readObjectDetached(); - int bufSize = reader.readInt(); - long timeInterval = reader.readLong(); - boolean autoUnsubscribe = reader.readBoolean(); - Query initQry = readInitialQuery(reader); - - PlatformContinuousQuery qry = platformCtx.createContinuousQuery(ptr, hasFilter, filter); - - qry.start(cache, loc, bufSize, timeInterval, autoUnsubscribe, initQry); - - return qry; - } - - default: - return super.processInStreamOutObject(type, reader); - } - } - - /** - * Read arguments for SQL query. - * - * @param reader Reader. - * @return Arguments. - */ - @Nullable private Object[] readQueryArgs(PortableRawReaderEx reader) { - int cnt = reader.readInt(); - - if (cnt > 0) { - Object[] args = new Object[cnt]; - - for (int i = 0; i < cnt; i++) - args[i] = reader.readObjectDetached(); - - return args; - } - else - return null; - } - - /** {@inheritDoc} */ - @Override protected void processOutStream(int type, PortableRawWriterEx writer) throws IgniteCheckedException { - switch (type) { - case OP_GET_NAME: - writer.writeObject(cache.getName()); - - break; - - case OP_METRICS: - CacheMetrics metrics = cache.metrics(); - - writer.writeLong(metrics.getCacheGets()); - writer.writeLong(metrics.getCachePuts()); - writer.writeLong(metrics.getCacheHits()); - writer.writeLong(metrics.getCacheMisses()); - writer.writeLong(metrics.getCacheTxCommits()); - writer.writeLong(metrics.getCacheTxRollbacks()); - writer.writeLong(metrics.getCacheEvictions()); - writer.writeLong(metrics.getCacheRemovals()); - writer.writeFloat(metrics.getAveragePutTime()); - writer.writeFloat(metrics.getAverageGetTime()); - writer.writeFloat(metrics.getAverageRemoveTime()); - writer.writeFloat(metrics.getAverageTxCommitTime()); - writer.writeFloat(metrics.getAverageTxRollbackTime()); - writer.writeString(metrics.name()); - writer.writeLong(metrics.getOverflowSize()); - writer.writeLong(metrics.getOffHeapEntriesCount()); - writer.writeLong(metrics.getOffHeapAllocatedSize()); - writer.writeInt(metrics.getSize()); - writer.writeInt(metrics.getKeySize()); - writer.writeBoolean(metrics.isEmpty()); - writer.writeInt(metrics.getDhtEvictQueueCurrentSize()); - writer.writeInt(metrics.getTxThreadMapSize()); - writer.writeInt(metrics.getTxXidMapSize()); - writer.writeInt(metrics.getTxCommitQueueSize()); - writer.writeInt(metrics.getTxPrepareQueueSize()); - writer.writeInt(metrics.getTxStartVersionCountsSize()); - writer.writeInt(metrics.getTxCommittedVersionsSize()); - writer.writeInt(metrics.getTxRolledbackVersionsSize()); - writer.writeInt(metrics.getTxDhtThreadMapSize()); - writer.writeInt(metrics.getTxDhtXidMapSize()); - writer.writeInt(metrics.getTxDhtCommitQueueSize()); - writer.writeInt(metrics.getTxDhtPrepareQueueSize()); - writer.writeInt(metrics.getTxDhtStartVersionCountsSize()); - writer.writeInt(metrics.getTxDhtCommittedVersionsSize()); - writer.writeInt(metrics.getTxDhtRolledbackVersionsSize()); - writer.writeBoolean(metrics.isWriteBehindEnabled()); - writer.writeInt(metrics.getWriteBehindFlushSize()); - writer.writeInt(metrics.getWriteBehindFlushThreadCount()); - writer.writeLong(metrics.getWriteBehindFlushFrequency()); - writer.writeInt(metrics.getWriteBehindStoreBatchSize()); - writer.writeInt(metrics.getWriteBehindTotalCriticalOverflowCount()); - writer.writeInt(metrics.getWriteBehindCriticalOverflowCount()); - writer.writeInt(metrics.getWriteBehindErrorRetryCount()); - writer.writeInt(metrics.getWriteBehindBufferSize()); - writer.writeString(metrics.getKeyType()); - writer.writeString(metrics.getValueType()); - writer.writeBoolean(metrics.isStoreByValue()); - writer.writeBoolean(metrics.isStatisticsEnabled()); - writer.writeBoolean(metrics.isManagementEnabled()); - writer.writeBoolean(metrics.isReadThrough()); - writer.writeBoolean(metrics.isWriteThrough()); - writer.writeFloat(metrics.getCacheHitPercentage()); - writer.writeFloat(metrics.getCacheMissPercentage()); - - break; - - default: - super.processOutStream(type, writer); - } - } - - /** {@inheritDoc} */ - @SuppressWarnings({"IfMayBeConditional", "ConstantConditions"}) - @Override protected void processInStreamOutStream(int type, PortableRawReaderEx reader, PortableRawWriterEx writer) - throws IgniteCheckedException { - switch (type) { - case OP_GET: { - writer.writeObjectDetached(cache.get(reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_PUT: { - writer.writeObjectDetached(cache.getAndPut(reader.readObjectDetached(), reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_REPLACE: { - writer.writeObjectDetached(cache.getAndReplace(reader.readObjectDetached(), - reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_REMOVE: { - writer.writeObjectDetached(cache.getAndRemove(reader.readObjectDetached())); - - break; - } - - case OP_GET_AND_PUT_IF_ABSENT: { - writer.writeObjectDetached(cache.getAndPutIfAbsent(reader.readObjectDetached(), reader.readObjectDetached())); - - break; - } - - case OP_PEEK: { - Object key = reader.readObjectDetached(); - - CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(reader.readInt()); - - writer.writeObjectDetached(cache.localPeek(key, modes)); - - break; - } - - case OP_GET_ALL: { - Set keys = PlatformUtils.readSet(reader); - - Map entries = cache.getAll(keys); - - PlatformUtils.writeNullableMap(writer, entries); - - break; - } - - case OP_INVOKE: { - Object key = reader.readObjectDetached(); - - CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - - try { - writer.writeObjectDetached(cache.invoke(key, proc)); - } - catch (EntryProcessorException ex) - { - if (ex.getCause() instanceof PlatformNativeException) - writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause()); - else - throw ex; - } - - break; - } - - case OP_INVOKE_ALL: { - Set<Object> keys = PlatformUtils.readSet(reader); - - CacheEntryProcessor proc = platformCtx.createCacheEntryProcessor(reader.readObjectDetached(), 0); - - writeInvokeAllResult(writer, cache.invokeAll(keys, proc)); - - break; - } - - case OP_LOCK: - writer.writeLong(registerLock(cache.lock(reader.readObjectDetached()))); - - break; - - case OP_LOCK_ALL: - writer.writeLong(registerLock(cache.lockAll(PlatformUtils.readCollection(reader)))); - - break; - - default: - super.processInStreamOutStream(type, reader, writer); - } - } - - /** {@inheritDoc} */ - @Override public Exception convertException(Exception e) { - if (e instanceof CachePartialUpdateException) - return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e.getCause(), - platformCtx, keepPortable); - - if (e instanceof CachePartialUpdateCheckedException) - return new PlatformCachePartialUpdateException((CachePartialUpdateCheckedException)e, platformCtx, keepPortable); - - if (e.getCause() instanceof EntryProcessorException) - return (EntryProcessorException) e.getCause(); - - return super.convertException(e); - } - - /** - * Writes the result of InvokeAll cache method. - * - * @param writer Writer. - * @param results Results. - */ - private static void writeInvokeAllResult(PortableRawWriterEx writer, Map<Object, EntryProcessorResult> results) { - if (results == null) { - writer.writeInt(-1); - - return; - } - - writer.writeInt(results.size()); - - for (Map.Entry<Object, EntryProcessorResult> entry : results.entrySet()) { - writer.writeObjectDetached(entry.getKey()); - - EntryProcessorResult procRes = entry.getValue(); - - try { - Object res = procRes.get(); - - writer.writeBoolean(false); // No exception - - writer.writeObjectDetached(res); - } - catch (Exception ex) { - writer.writeBoolean(true); // Exception - - writeError(writer, ex); - } - } - } - - /** - * Writes an error to the writer either as a native exception, or as a couple of strings. - * @param writer Writer. - * @param ex Exception. - */ - private static void writeError(PortableRawWriterEx writer, Exception ex) { - if (ex.getCause() instanceof PlatformNativeException) - writer.writeObjectDetached(((PlatformNativeException)ex.getCause()).cause()); - else { - writer.writeObjectDetached(ex.getClass().getName()); - writer.writeObjectDetached(ex.getMessage()); - } - } - - /** <inheritDoc /> */ - @Override protected IgniteFuture currentFuture() throws IgniteCheckedException { - return cache.future(); - } - - /** <inheritDoc /> */ - @Nullable @Override protected PlatformFutureUtils.Writer futureWriter(int opId) { - if (opId == OP_GET_ALL) - return WRITER_GET_ALL; - - if (opId == OP_INVOKE) - return WRITER_INVOKE; - - if (opId == OP_INVOKE_ALL) - return WRITER_INVOKE_ALL; - - return null; - } - - /** - * Clears the contents of the cache, without notifying listeners or - * {@ignitelink javax.cache.integration.CacheWriter}s. - * - * @throws IllegalStateException if the cache is closed. - * @throws javax.cache.CacheException if there is a problem during the clear - */ - public void clear() throws IgniteCheckedException { - cache.clear(); - } - - /** - * Removes all entries. - * - * @throws org.apache.ignite.IgniteCheckedException In case of error. - */ - public void removeAll() throws IgniteCheckedException { - cache.removeAll(); - } - - /** - * Read cache size. - * - * @param peekModes Encoded peek modes. - * @param loc Local mode flag. - * @return Size. - */ - public int size(int peekModes, boolean loc) { - CachePeekMode[] modes = PlatformUtils.decodeCachePeekModes(peekModes); - - return loc ? cache.localSize(modes) : cache.size(modes); - } - - /** - * Create cache iterator. - * - * @return Cache iterator. - */ - public PlatformCacheIterator iterator() { - Iterator<Cache.Entry> iter = cache.iterator(); - - return new PlatformCacheIterator(platformCtx, iter); - } - - /** - * Create cache iterator over local entries. - * - * @param peekModes Peke modes. - * @return Cache iterator. - */ - public PlatformCacheIterator localIterator(int peekModes) { - CachePeekMode[] peekModes0 = PlatformUtils.decodeCachePeekModes(peekModes); - - Iterator<Cache.Entry> iter = cache.localEntries(peekModes0).iterator(); - - return new PlatformCacheIterator(platformCtx, iter); - } - - /** - * Enters a lock. - * - * @param id Lock id. - */ - public void enterLock(long id) throws InterruptedException { - lock(id).lockInterruptibly(); - } - - /** - * Exits a lock. - * - * @param id Lock id. - */ - public void exitLock(long id) { - lock(id).unlock(); - } - - /** - * Attempts to enter a lock. - * - * @param id Lock id. - * @param timeout Timeout, in milliseconds. -1 for infinite timeout. - */ - public boolean tryEnterLock(long id, long timeout) throws InterruptedException { - return timeout == -1 - ? lock(id).tryLock() - : lock(id).tryLock(timeout, TimeUnit.MILLISECONDS); - } - - /** - * Rebalances the cache. - * - * @param futId Future id. - */ - public void rebalance(long futId) { - PlatformFutureUtils.listen(platformCtx, cache.rebalance().chain(new C1<IgniteFuture, Object>() { - @Override public Object apply(IgniteFuture fut) { - return null; - } - }), futId, PlatformFutureUtils.TYP_OBJ, this); - } - - /** - * Unregister lock. - * - * @param id Lock id. - */ - public void closeLock(long id){ - Lock lock = lockMap.remove(id); - - assert lock != null : "Failed to unregister lock: " + id; - } - - /** - * Get lock by id. - * - * @param id Id. - * @return Lock. - */ - private Lock lock(long id) { - Lock lock = lockMap.get(id); - - assert lock != null : "Lock not found for ID: " + id; - - return lock; - } - - /** - * Registers a lock in a map. - * - * @param lock Lock to register. - * @return Registered lock id. - */ - private long registerLock(Lock lock) { - long id = LOCK_ID_GEN.incrementAndGet(); - - lockMap.put(id, lock); - - return id; - } - - /** - * Runs specified query. - */ - private PlatformQueryCursor runQuery(PortableRawReaderEx reader, Query qry) throws IgniteCheckedException { - - try { - QueryCursorEx cursor = (QueryCursorEx) cache.query(qry); - - return new PlatformQueryCursor(platformCtx, cursor, - qry.getPageSize() > 0 ? qry.getPageSize(): Query.DFLT_PAGE_SIZE); - } - catch (Exception err) { - throw PlatformUtils.unwrapQueryException(err); - } - } - - /** - * Runs specified fields query. - */ - private PlatformFieldsQueryCursor runFieldsQuery(PortableRawReaderEx reader, Query qry) - throws IgniteCheckedException { - try { - QueryCursorEx cursor = (QueryCursorEx) cache.query(qry); - - return new PlatformFieldsQueryCursor(platformCtx, cursor, - qry.getPageSize() > 0 ? qry.getPageSize() : Query.DFLT_PAGE_SIZE); - } - catch (Exception err) { - throw PlatformUtils.unwrapQueryException(err); - } - } - - /** - * Reads the query of specified type. - */ - private Query readInitialQuery(PortableRawReaderEx reader) throws IgniteCheckedException { - int typ = reader.readInt(); - - switch (typ) { - case -1: - return null; - - case OP_QRY_SCAN: - return readScanQuery(reader); - - case OP_QRY_SQL: - return readSqlQuery(reader); - - case OP_QRY_TXT: - return readTextQuery(reader); - } - - throw new IgniteCheckedException("Unsupported query type: " + typ); - } - - /** - * Reads sql query. - */ - private Query readSqlQuery(PortableRawReaderEx reader) { - boolean loc = reader.readBoolean(); - String sql = reader.readString(); - String typ = reader.readString(); - final int pageSize = reader.readInt(); - - Object[] args = readQueryArgs(reader); - - return new SqlQuery(typ, sql).setPageSize(pageSize).setArgs(args).setLocal(loc); - } - - /** - * Reads fields query. - */ - private Query readFieldsQuery(PortableRawReaderEx reader) { - boolean loc = reader.readBoolean(); - String sql = reader.readString(); - final int pageSize = reader.readInt(); - - Object[] args = readQueryArgs(reader); - - return new SqlFieldsQuery(sql).setPageSize(pageSize).setArgs(args).setLocal(loc); - } - - /** - * Reads text query. - */ - private Query readTextQuery(PortableRawReaderEx reader) { - boolean loc = reader.readBoolean(); - String txt = reader.readString(); - String typ = reader.readString(); - final int pageSize = reader.readInt(); - - return new TextQuery(typ, txt).setPageSize(pageSize).setLocal(loc); - } - - /** - * Reads scan query. - */ - private Query readScanQuery(PortableRawReaderEx reader) { - boolean loc = reader.readBoolean(); - final int pageSize = reader.readInt(); - - boolean hasPart = reader.readBoolean(); - - Integer part = hasPart ? reader.readInt() : null; - - ScanQuery qry = new ScanQuery().setPageSize(pageSize); - - qry.setPartition(part); - - Object pred = reader.readObjectDetached(); - - if (pred != null) - qry.setFilter(platformCtx.createCacheEntryFilter(pred, reader.readLong())); - - qry.setLocal(loc); - - return qry; - } - - /** - * Writes error with EntryProcessorException cause. - */ - private static class GetAllWriter implements PlatformFutureUtils.Writer { - /** <inheritDoc /> */ - @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { - assert obj instanceof Map; - - PlatformUtils.writeNullableMap(writer, (Map) obj); - } - - /** <inheritDoc /> */ - @Override public boolean canWrite(Object obj, Throwable err) { - return err == null; - } - } - - /** - * Writes error with EntryProcessorException cause. - */ - private static class EntryProcessorInvokeWriter implements PlatformFutureUtils.Writer { - /** <inheritDoc /> */ - @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { - if (err == null) { - writer.writeBoolean(false); // No error. - - writer.writeObjectDetached(obj); - } - else { - writer.writeBoolean(true); // Error. - - writeError(writer, (Exception) err); - } - } - - /** <inheritDoc /> */ - @Override public boolean canWrite(Object obj, Throwable err) { - return true; - } - } - - /** - * Writes results of InvokeAll method. - */ - private static class EntryProcessorInvokeAllWriter implements PlatformFutureUtils.Writer { - /** <inheritDoc /> */ - @Override public void write(PortableRawWriterEx writer, Object obj, Throwable err) { - writeInvokeAllResult(writer, (Map)obj); - } - - /** <inheritDoc /> */ - @Override public boolean canWrite(Object obj, Throwable err) { - return obj != null && err == null; - } - } - - /** - * Interop expiry policy. - */ - private static class InteropExpiryPolicy implements ExpiryPolicy { - /** Duration: unchanged. */ - private static final long DUR_UNCHANGED = -2; - - /** Duration: eternal. */ - private static final long DUR_ETERNAL = -1; - - /** Duration: zero. */ - private static final long DUR_ZERO = 0; - - /** Expiry for create. */ - private final Duration create; - - /** Expiry for update. */ - private final Duration update; - - /** Expiry for access. */ - private final Duration access; - - /** - * Constructor. - * - * @param create Expiry for create. - * @param update Expiry for update. - * @param access Expiry for access. - */ - public InteropExpiryPolicy(long create, long update, long access) { - this.create = convert(create); - this.update = convert(update); - this.access = convert(access); - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForCreation() { - return create; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForUpdate() { - return update; - } - - /** {@inheritDoc} */ - @Override public Duration getExpiryForAccess() { - return access; - } - - /** - * Convert encoded duration to actual duration. - * - * @param dur Encoded duration. - * @return Actual duration. - */ - private static Duration convert(long dur) { - if (dur == DUR_UNCHANGED) - return null; - else if (dur == DUR_ETERNAL) - return Duration.ETERNAL; - else if (dur == DUR_ZERO) - return Duration.ZERO; - else { - assert dur > 0; - - return new Duration(TimeUnit.MILLISECONDS, dur); - } - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/8045c820/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java deleted file mode 100644 index 5f8ec8f..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryFilterImpl.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform.cache; - -import org.apache.ignite.Ignite; -import org.apache.ignite.internal.portable.PortableRawWriterEx; -import org.apache.ignite.internal.processors.platform.PlatformAbstractPredicate; -import org.apache.ignite.internal.processors.platform.PlatformContext; -import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; -import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.resources.IgniteInstanceResource; - -/** - * Interop filter. Delegates apply to native platform. - */ -public class PlatformCacheEntryFilterImpl extends PlatformAbstractPredicate implements PlatformCacheEntryFilter { - /** */ - private static final long serialVersionUID = 0L; - - /** - * {@link java.io.Externalizable} support. - */ - public PlatformCacheEntryFilterImpl() { - super(); - } - - /** - * Constructor. - * - * @param pred .Net portable predicate. - * @param ptr Pointer to predicate in the native platform. - * @param ctx Kernal context. - */ - public PlatformCacheEntryFilterImpl(Object pred, long ptr, PlatformContext ctx) { - super(pred, ptr, ctx); - - assert pred != null; - } - - /** {@inheritDoc} */ - @Override public boolean apply(Object k, Object v) { - try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformOutputStream out = mem.output(); - - PortableRawWriterEx writer = ctx.writer(out); - - writer.writeObject(k); - writer.writeObject(v); - - out.synchronize(); - - return ctx.gateway().cacheEntryFilterApply(ptr, mem.pointer()) != 0; - } - } - - /** {@inheritDoc} */ - @Override public void onClose() { - if (ptr == 0) - return; - - assert ctx != null; - - ctx.gateway().cacheEntryFilterDestroy(ptr); - - ptr = 0; - } - - /** - * @param ignite Ignite instance. - */ - @IgniteInstanceResource - public void setIgniteInstance(Ignite ignite) { - ctx = PlatformUtils.platformContext(ignite); - - if (ptr != 0) - return; - - try (PlatformMemory mem = ctx.memory().allocate()) { - PlatformOutputStream out = mem.output(); - - PortableRawWriterEx writer = ctx.writer(out); - - writer.writeObject(pred); - - out.synchronize(); - - ptr = ctx.gateway().cacheEntryFilterCreate(mem.pointer()); - } - } -} \ No newline at end of file