Platforms: reworked PlatformCacheEntryProcessor interface.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/0e25f55c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/0e25f55c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/0e25f55c Branch: refs/heads/ignite-1093-2 Commit: 0e25f55cfa33ae4af7c52bd4e2ee49297ef623f4 Parents: 980a934 Author: vozerov-gridgain <[email protected]> Authored: Tue Sep 1 11:41:28 2015 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Sep 1 11:41:28 2015 +0300 ---------------------------------------------------------------------- .../processors/platform/PlatformContext.java | 3 +- .../cache/PlatformCacheEntryProcessor.java | 27 +++ .../cache/PlatformCacheEntryProcessor.java | 220 ------------------- .../cache/PlatformCacheEntryProcessorImpl.java | 220 +++++++++++++++++++ 4 files changed, 249 insertions(+), 221 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/0e25f55c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java index 4c70360..cea8326 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformContext.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.portable.PortableRawReaderEx; import org.apache.ignite.internal.portable.PortableRawWriterEx; import org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryFilterEx; import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryFilter; +import org.apache.ignite.internal.processors.platform.cache.PlatformCacheEntryProcessor; import org.apache.ignite.internal.processors.platform.cache.query.PlatformContinuousQuery; import org.apache.ignite.internal.processors.platform.callback.PlatformCallbackGateway; import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterNodeFilter; @@ -248,7 +249,7 @@ public interface PlatformContext { * @param ptr Pointer. * @return Entry processor. */ - public CacheEntryProcessor createCacheEntryProcessor(Object proc, long ptr); + public PlatformCacheEntryProcessor createCacheEntryProcessor(Object proc, long ptr); /** * Create cache entry filter. http://git-wip-us.apache.org/repos/asf/ignite/blob/0e25f55c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java new file mode 100644 index 0000000..3d8022f --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import org.apache.ignite.cache.CacheEntryProcessor; + +/** + * Platform cache entry processor marker interface. + */ +public interface PlatformCacheEntryProcessor extends CacheEntryProcessor { + // No-op. +} http://git-wip-us.apache.org/repos/asf/ignite/blob/0e25f55c/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java deleted file mode 100644 index fd14632..0000000 --- a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessor.java +++ /dev/null @@ -1,220 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.processors.platform.cache; - -import java.io.Externalizable; -import java.io.IOException; -import java.io.ObjectInput; -import java.io.ObjectOutput; -import javax.cache.processor.EntryProcessorException; -import javax.cache.processor.MutableEntry; -import org.apache.ignite.Ignite; -import org.apache.ignite.IgniteCheckedException; -import org.apache.ignite.cache.CacheEntryProcessor; -import org.apache.ignite.internal.IgniteKernal; -import org.apache.ignite.internal.portable.PortableRawReaderEx; -import org.apache.ignite.internal.portable.PortableRawWriterEx; -import org.apache.ignite.internal.processors.platform.PlatformContext; -import org.apache.ignite.internal.processors.platform.PlatformProcessor; -import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; -import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; -import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; -import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; -import org.apache.ignite.internal.util.typedef.internal.U; - -/** - * Interop cache entry processor. Delegates processing to native platform. - */ -public class PlatformCacheEntryProcessor<K, V, T> implements CacheEntryProcessor<K, V, T>, Externalizable { - /** */ - private static final long serialVersionUID = 0L; - - /** Indicates that entry has not been modified */ - private static final byte ENTRY_STATE_INTACT = 0; - - /** Indicates that entry value has been set */ - private static final byte ENTRY_STATE_VALUE_SET = 1; - - /** Indicates that remove has been called on an entry */ - private static final byte ENTRY_STATE_REMOVED = 2; - - /** Indicates error in processor that is written as portable. */ - private static final byte ENTRY_STATE_ERR_PORTABLE = 3; - - /** Indicates error in processor that is written as string. */ - private static final byte ENTRY_STATE_ERR_STRING = 4; - - /** Native portable processor */ - private Object proc; - - /** Pointer to processor in the native platform. */ - private transient long ptr; - - /** - * {@link java.io.Externalizable} support. - */ - public PlatformCacheEntryProcessor() { - // No-op. - } - - /** - * Constructor. - * - * @param proc Native portable processor - * @param ptr Pointer to processor in the native platform. - */ - public PlatformCacheEntryProcessor(Object proc, long ptr) { - this.proc = proc; - this.ptr = ptr; - } - - /** {@inheritDoc} */ - @Override public T process(MutableEntry<K, V> entry, Object... arguments) throws EntryProcessorException { - try { - IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class); - - PlatformProcessor interopProc; - - try { - interopProc = PlatformUtils.platformProcessor(ignite); - } - catch (IllegalStateException ex){ - throw new EntryProcessorException(ex); - } - - interopProc.awaitStart(); - - return execute0(interopProc.context(), entry); - } - catch (IgniteCheckedException e) { - throw U.convertException(e); - } - } - - /** - * Executes interop entry processor on a given entry, updates entry and returns result. - * - * @param ctx Context. - * @param entry Entry. - * @return Processing result. - * @throws org.apache.ignite.IgniteCheckedException - */ - private T execute0(PlatformContext ctx, MutableEntry<K, V> entry) - throws IgniteCheckedException { - try (PlatformMemory outMem = ctx.memory().allocate()) { - PlatformOutputStream out = outMem.output(); - - PortableRawWriterEx writer = ctx.writer(out); - - writeEntryAndProcessor(entry, writer); - - out.synchronize(); - - try (PlatformMemory inMem = ctx.memory().allocate()) { - PlatformInputStream in = inMem.input(); - - ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer()); - - in.synchronize(); - - PortableRawReaderEx reader = ctx.reader(in); - - return readResultAndUpdateEntry(ctx, entry, reader); - } - } - } - - /** - * Writes mutable entry and entry processor to the stream. - * - * @param entry Entry to process. - * @param writer Writer. - */ - private void writeEntryAndProcessor(MutableEntry<K, V> entry, PortableRawWriterEx writer) { - writer.writeObject(entry.getKey()); - writer.writeObject(entry.getValue()); - - if (ptr != 0) { - // Execute locally - we have a pointer to native processor. - writer.writeBoolean(true); - writer.writeLong(ptr); - } - else { - // We are on a remote node. Send processor holder back to native. - writer.writeBoolean(false); - writer.writeObject(proc); - } - } - - /** - * Reads processing result from stream, updates mutable entry accordingly, and returns the result. - * - * @param entry Mutable entry to update. - * @param reader Reader. - * @return Entry processing result - * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code. - */ - @SuppressWarnings("unchecked") - private T readResultAndUpdateEntry(PlatformContext ctx, MutableEntry<K, V> entry, PortableRawReaderEx reader) { - byte state = reader.readByte(); - - switch (state) { - case ENTRY_STATE_VALUE_SET: - entry.setValue((V)reader.readObject()); - - break; - - case ENTRY_STATE_REMOVED: - entry.remove(); - - break; - - case ENTRY_STATE_ERR_PORTABLE: - // Full exception - Object nativeErr = reader.readObjectDetached(); - - assert nativeErr != null; - - throw new EntryProcessorException("Failed to execute native cache entry processor.", - ctx.createNativeException(nativeErr)); - - case ENTRY_STATE_ERR_STRING: - // Native exception was not serializable, we have only message. - String errMsg = reader.readString(); - - assert errMsg != null; - - throw new EntryProcessorException("Failed to execute native cache entry processor: " + errMsg); - - default: - assert state == ENTRY_STATE_INTACT; - } - - return (T)reader.readObject(); - } - - /** {@inheritDoc} */ - @Override public void writeExternal(ObjectOutput out) throws IOException { - out.writeObject(proc); - } - - /** {@inheritDoc} */ - @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { - proc = in.readObject(); - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/0e25f55c/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java new file mode 100644 index 0000000..16124fe --- /dev/null +++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cache/PlatformCacheEntryProcessorImpl.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import javax.cache.processor.EntryProcessorException; +import javax.cache.processor.MutableEntry; +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.IgniteKernal; +import org.apache.ignite.internal.portable.PortableRawReaderEx; +import org.apache.ignite.internal.portable.PortableRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.PlatformProcessor; +import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.internal.util.typedef.internal.U; + +/** + * Platform cache entry processor. Delegates processing to native platform. + */ +public class PlatformCacheEntryProcessorImpl implements PlatformCacheEntryProcessor, Externalizable { + /** */ + private static final long serialVersionUID = 0L; + + /** Indicates that entry has not been modified */ + private static final byte ENTRY_STATE_INTACT = 0; + + /** Indicates that entry value has been set */ + private static final byte ENTRY_STATE_VALUE_SET = 1; + + /** Indicates that remove has been called on an entry */ + private static final byte ENTRY_STATE_REMOVED = 2; + + /** Indicates error in processor that is written as portable. */ + private static final byte ENTRY_STATE_ERR_PORTABLE = 3; + + /** Indicates error in processor that is written as string. */ + private static final byte ENTRY_STATE_ERR_STRING = 4; + + /** Native portable processor */ + private Object proc; + + /** Pointer to processor in the native platform. */ + private transient long ptr; + + /** + * {@link java.io.Externalizable} support. + */ + public PlatformCacheEntryProcessorImpl() { + // No-op. + } + + /** + * Constructor. + * + * @param proc Native portable processor + * @param ptr Pointer to processor in the native platform. + */ + public PlatformCacheEntryProcessorImpl(Object proc, long ptr) { + this.proc = proc; + this.ptr = ptr; + } + + /** {@inheritDoc} */ + @Override public Object process(MutableEntry entry, Object... args) + throws EntryProcessorException { + try { + IgniteKernal ignite = (IgniteKernal)entry.unwrap(Ignite.class); + + PlatformProcessor interopProc; + + try { + interopProc = PlatformUtils.platformProcessor(ignite); + } + catch (IllegalStateException ex){ + throw new EntryProcessorException(ex); + } + + interopProc.awaitStart(); + + return execute0(interopProc.context(), entry); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** + * Executes interop entry processor on a given entry, updates entry and returns result. + * + * @param ctx Context. + * @param entry Entry. + * @return Processing result. + * @throws org.apache.ignite.IgniteCheckedException + */ + private Object execute0(PlatformContext ctx, MutableEntry entry) + throws IgniteCheckedException { + try (PlatformMemory outMem = ctx.memory().allocate()) { + PlatformOutputStream out = outMem.output(); + + PortableRawWriterEx writer = ctx.writer(out); + + writeEntryAndProcessor(entry, writer); + + out.synchronize(); + + try (PlatformMemory inMem = ctx.memory().allocate()) { + PlatformInputStream in = inMem.input(); + + ctx.gateway().cacheInvoke(outMem.pointer(), inMem.pointer()); + + in.synchronize(); + + PortableRawReaderEx reader = ctx.reader(in); + + return readResultAndUpdateEntry(ctx, entry, reader); + } + } + } + + /** + * Writes mutable entry and entry processor to the stream. + * + * @param entry Entry to process. + * @param writer Writer. + */ + private void writeEntryAndProcessor(MutableEntry entry, PortableRawWriterEx writer) { + writer.writeObject(entry.getKey()); + writer.writeObject(entry.getValue()); + + if (ptr != 0) { + // Execute locally - we have a pointer to native processor. + writer.writeBoolean(true); + writer.writeLong(ptr); + } + else { + // We are on a remote node. Send processor holder back to native. + writer.writeBoolean(false); + writer.writeObject(proc); + } + } + + /** + * Reads processing result from stream, updates mutable entry accordingly, and returns the result. + * + * @param entry Mutable entry to update. + * @param reader Reader. + * @return Entry processing result + * @throws javax.cache.processor.EntryProcessorException If processing has failed in user code. + */ + @SuppressWarnings("unchecked") + private Object readResultAndUpdateEntry(PlatformContext ctx, MutableEntry entry, PortableRawReaderEx reader) { + byte state = reader.readByte(); + + switch (state) { + case ENTRY_STATE_VALUE_SET: + entry.setValue(reader.readObject()); + + break; + + case ENTRY_STATE_REMOVED: + entry.remove(); + + break; + + case ENTRY_STATE_ERR_PORTABLE: + // Full exception + Object nativeErr = reader.readObjectDetached(); + + assert nativeErr != null; + + throw new EntryProcessorException("Failed to execute native cache entry processor.", + ctx.createNativeException(nativeErr)); + + case ENTRY_STATE_ERR_STRING: + // Native exception was not serializable, we have only message. + String errMsg = reader.readString(); + + assert errMsg != null; + + throw new EntryProcessorException("Failed to execute native cache entry processor: " + errMsg); + + default: + assert state == ENTRY_STATE_INTACT; + } + + return reader.readObject(); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(proc); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + proc = in.readObject(); + } +} \ No newline at end of file
