IGNITE-1563 .NET: Implemented "atomics": AtomicReference and AtomicSequence. This closes #455.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/f7c1296c Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/f7c1296c Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/f7c1296c Branch: refs/heads/ignite-1786 Commit: f7c1296cceba73ce1b61af605e476a905a0c8ab4 Parents: e2e216d Author: Pavel Tupitsyn <[email protected]> Authored: Tue Feb 9 14:43:00 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Tue Feb 9 14:43:00 2016 +0300 ---------------------------------------------------------------------- .../GridCacheAtomicReferenceImpl.java | 68 ++---- .../platform/PlatformNoopProcessor.java | 10 + .../processors/platform/PlatformProcessor.java | 20 ++ .../platform/PlatformProcessorImpl.java | 18 ++ .../callback/PlatformCallbackUtils.java | 1 - .../datastructures/PlatformAtomicReference.java | 141 +++++++++++ .../datastructures/PlatformAtomicSequence.java | 122 ++++++++++ .../cpp/common/include/ignite/common/exports.h | 15 ++ .../cpp/common/include/ignite/common/java.h | 32 +++ .../platforms/cpp/common/project/vs/module.def | 15 +- modules/platforms/cpp/common/src/exports.cpp | 52 ++++ modules/platforms/cpp/common/src/java.cpp | 181 ++++++++++++++ .../Apache.Ignite.Core.Tests.csproj | 2 + .../DataStructures/AtomicReferenceTest.cs | 239 +++++++++++++++++++ .../DataStructures/AtomicSequenceTest.cs | 131 ++++++++++ .../Apache.Ignite.Core.csproj | 4 + .../DataStructures/IAtomicReference.cs | 64 +++++ .../DataStructures/IAtomicSequence.cs | 69 ++++++ .../dotnet/Apache.Ignite.Core/IIgnite.cs | 28 +++ .../Impl/DataStructures/AtomicReference.cs | 92 +++++++ .../Impl/DataStructures/AtomicSequence.cs | 90 +++++++ .../dotnet/Apache.Ignite.Core/Impl/Ignite.cs | 50 ++++ .../Apache.Ignite.Core/Impl/IgniteProxy.cs | 12 + .../Impl/Unmanaged/IgniteJniNativeMethods.cs | 37 +++ .../Impl/Unmanaged/UnmanagedUtils.cs | 79 ++++++ 25 files changed, 1526 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java index 37cdaea..e044138 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/datastructures/GridCacheAtomicReferenceImpl.java @@ -35,8 +35,6 @@ import org.apache.ignite.internal.util.typedef.internal.CU; import org.apache.ignite.internal.util.typedef.internal.S; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lang.IgniteBiTuple; -import org.apache.ignite.lang.IgniteClosure; -import org.apache.ignite.lang.IgnitePredicate; import static org.apache.ignite.transactions.TransactionConcurrency.PESSIMISTIC; import static org.apache.ignite.transactions.TransactionIsolation.REPEATABLE_READ; @@ -153,10 +151,20 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef /** {@inheritDoc} */ @Override public boolean compareAndSet(T expVal, T newVal) { + return compareAndSetAndGet(newVal, expVal) == expVal; + } + + /** + * Compares current value with specified value for equality and, if they are equal, replaces current value. + * + * @param newVal New value to set. + * @return Original value. + */ + public T compareAndSetAndGet(T newVal, T expVal) { checkRemoved(); try { - return CU.outTx(internalCompareAndSet(wrapperPredicate(expVal), wrapperClosure(newVal)), ctx); + return CU.outTx(internalCompareAndSetAndGet(expVal, newVal), ctx); } catch (IgniteCheckedException e) { throw U.convertException(e); @@ -197,34 +205,6 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef } /** - * Method make wrapper predicate for existing value. - * - * @param val Value. - * @return Predicate. - */ - private IgnitePredicate<T> wrapperPredicate(final T val) { - return new IgnitePredicate<T>() { - @Override public boolean apply(T e) { - return F.eq(val, e); - } - }; - } - - /** - * Method make wrapper closure for existing value. - * - * @param val Value. - * @return Closure. - */ - private IgniteClosure<T, T> wrapperClosure(final T val) { - return new IgniteClosure<T, T>() { - @Override public T apply(T e) { - return val; - } - }; - } - - /** * Method returns callable for execution {@link #set(Object)} operation in async and sync mode. * * @param val Value will be set in reference . @@ -260,39 +240,39 @@ public final class GridCacheAtomicReferenceImpl<T> implements GridCacheAtomicRef * Conditionally sets the new value. It will be set if {@code expValPred} is * evaluate to {@code true}. * - * @param expValPred Predicate which should evaluate to {@code true} for value to be set. - * @param newValClos Closure which generates new value. + * @param expVal Expected value. + * @param newVal New value. * @return Callable for execution in async and sync mode. */ - private Callable<Boolean> internalCompareAndSet(final IgnitePredicate<T> expValPred, - final IgniteClosure<T, T> newValClos) { - - return retryTopologySafe(new Callable<Boolean>() { - @Override public Boolean call() throws Exception { + private Callable<T> internalCompareAndSetAndGet(final T expVal, final T newVal) { + return retryTopologySafe(new Callable<T>() { + @Override public T call() throws Exception { try (IgniteInternalTx tx = CU.txStartInternal(ctx, atomicView, PESSIMISTIC, REPEATABLE_READ)) { GridCacheAtomicReferenceValue<T> ref = atomicView.get(key); if (ref == null) throw new IgniteCheckedException("Failed to find atomic reference with given name: " + name); - if (!expValPred.apply(ref.get())) { + T origVal = ref.get(); + + if (!F.eq(expVal, origVal)) { tx.setRollbackOnly(); - return false; + return origVal; } else { - ref.set(newValClos.apply(ref.get())); + ref.set(newVal); atomicView.getAndPut(key, ref); tx.commit(); - return true; + return expVal; } } catch (Error | Exception e) { - U.error(log, "Failed to compare and value [expValPred=" + expValPred + ", newValClos" + - newValClos + ", atomicReference" + this + ']', e); + U.error(log, "Failed to compare and value [expVal=" + expVal + ", newVal" + + newVal + ", atomicReference" + this + ']', e); throw e; } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java index b25e32e..8fe17e1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformNoopProcessor.java @@ -148,4 +148,14 @@ public class PlatformNoopProcessor extends GridProcessorAdapter implements Platf @Override public void getIgniteConfiguration(long memPtr) { // No-op. } + + /** {@inheritDoc} */ + @Override public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException { + return null; + } + + /** {@inheritDoc} */ + @Override public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException { + return null; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java index b59d93d..2d51c69 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessor.java @@ -208,6 +208,26 @@ public interface PlatformProcessor extends GridProcessor { public PlatformTarget atomicLong(String name, long initVal, boolean create) throws IgniteException; /** + * Get or create AtomicSequence. + * @param name Name. + * @param initVal Initial value. + * @param create Create flag. + * @return Platform atomic long. + * @throws IgniteException + */ + public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException; + + /** + * Get or create AtomicReference. + * @param name Name. + * @param memPtr Pointer to a stream with initial value. 0 for null initial value. + * @param create Create flag. + * @return Platform atomic long. + * @throws IgniteException + */ + public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException; + + /** * Gets the configuration of the current Ignite instance. * * @param memPtr Stream to write data to. http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java index 4ed8c25..d0e0a63 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/PlatformProcessorImpl.java @@ -18,6 +18,7 @@ package org.apache.ignite.internal.processors.platform; import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteAtomicSequence; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteDataStreamer; import org.apache.ignite.IgniteException; @@ -39,6 +40,8 @@ import org.apache.ignite.internal.processors.platform.cluster.PlatformClusterGro import org.apache.ignite.internal.processors.platform.compute.PlatformCompute; import org.apache.ignite.internal.processors.platform.datastreamer.PlatformDataStreamer; import org.apache.ignite.internal.processors.platform.datastructures.PlatformAtomicLong; +import org.apache.ignite.internal.processors.platform.datastructures.PlatformAtomicReference; +import org.apache.ignite.internal.processors.platform.datastructures.PlatformAtomicSequence; import org.apache.ignite.internal.processors.platform.dotnet.PlatformDotNetCacheStore; import org.apache.ignite.internal.processors.platform.events.PlatformEvents; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; @@ -361,6 +364,21 @@ public class PlatformProcessorImpl extends GridProcessorAdapter implements Platf } /** {@inheritDoc} */ + @Override public PlatformTarget atomicSequence(String name, long initVal, boolean create) throws IgniteException { + IgniteAtomicSequence atomicSeq = ignite().atomicSequence(name, initVal, create); + + if (atomicSeq == null) + return null; + + return new PlatformAtomicSequence(platformCtx, atomicSeq); + } + + /** {@inheritDoc} */ + @Override public PlatformTarget atomicReference(String name, long memPtr, boolean create) throws IgniteException { + return PlatformAtomicReference.createInstance(platformCtx, name, memPtr, create); + } + + /** {@inheritDoc} */ @Override public void getIgniteConfiguration(long memPtr) { PlatformOutputStream stream = platformCtx.memory().get(memPtr).output(); BinaryRawWriterEx writer = platformCtx.writer(stream); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java index 7f3ba6f..3112e0f 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -442,7 +442,6 @@ public class PlatformCallbackUtils { static native void serviceCancel(long envPtr, long svcPtr, long memPtr); /** - /** * Invokes service method. * * @param envPtr Environment pointer. http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java new file mode 100644 index 0000000..81b7570 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference.java @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.datastructures; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.datastructures.GridCacheAtomicReferenceImpl; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; + +/** + * Platform atomic reference wrapper. + */ +@SuppressWarnings("unchecked") +public class PlatformAtomicReference extends PlatformAbstractTarget { + /** */ + private static final int OP_GET = 1; + + /** */ + private static final int OP_SET = 2; + + /** */ + private static final int OP_COMPARE_AND_SET_AND_GET = 3; + + /** */ + private final GridCacheAtomicReferenceImpl atomicRef; + + /** + * Creates an instance or returns null. + * + * @param ctx Context. + * @param name Name. + * @param memPtr Pointer to a stream with initial value. 0 for default value. + * @param create Create flag. + * @return Instance of a PlatformAtomicReference, or null when Ignite reference with specific name is null. + */ + public static PlatformAtomicReference createInstance(PlatformContext ctx, String name, long memPtr, + boolean create) { + assert ctx != null; + assert name != null; + + Object initVal = null; + + if (memPtr != 0) { + try (PlatformMemory mem = ctx.memory().get(memPtr)) { + initVal = ctx.reader(mem).readObjectDetached(); + } + } + + GridCacheAtomicReferenceImpl atomicRef = + (GridCacheAtomicReferenceImpl)ctx.kernalContext().grid().atomicReference(name, initVal, create); + + if (atomicRef == null) + return null; + + return new PlatformAtomicReference(ctx, atomicRef); + } + + /** + * Ctor. + * + * @param ctx Context. + * @param ref Atomic reference to wrap. + */ + private PlatformAtomicReference(PlatformContext ctx, GridCacheAtomicReferenceImpl ref) { + super(ctx); + + assert ref != null; + + atomicRef = ref; + } + + /** + * Returns a value indicating whether this instance has been closed. + * + * @return Value indicating whether this instance has been closed. + */ + public boolean isClosed() { + return atomicRef.removed(); + } + + /** + * Closes this instance. + */ + public void close() { + atomicRef.close(); + } + + /** {@inheritDoc} */ + @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + if (type == OP_GET) + writer.writeObject(atomicRef.get()); + else + super.processOutStream(type, writer); + } + + /** {@inheritDoc} */ + @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) + throws IgniteCheckedException { + if (type == OP_SET) { + atomicRef.set(reader.readObjectDetached()); + + return 0; + } + + return super.processInStreamOutLong(type, reader); + } + + /** {@inheritDoc} */ + @Override protected void processInStreamOutStream(int type, BinaryRawReaderEx reader, + BinaryRawWriterEx writer) throws IgniteCheckedException { + if (type == OP_COMPARE_AND_SET_AND_GET) { + Object val = reader.readObjectDetached(); + final Object cmp = reader.readObjectDetached(); + + Object res = atomicRef.compareAndSetAndGet(val, cmp); + + writer.writeObject(res); + } + else + super.processInStreamOutStream(type, reader, writer); + } +} + http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java new file mode 100644 index 0000000..ce7e364 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.datastructures; + +import org.apache.ignite.IgniteAtomicSequence; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; + +/** + * Platform atomic sequence wrapper. + */ +public class PlatformAtomicSequence extends PlatformAbstractTarget { + /** */ + private final IgniteAtomicSequence atomicSeq; + + /** + * Ctor. + * @param ctx Context. + * @param atomicSeq AtomicSequence to wrap. + */ + public PlatformAtomicSequence(PlatformContext ctx, IgniteAtomicSequence atomicSeq) { + super(ctx); + + assert atomicSeq != null; + + this.atomicSeq = atomicSeq; + } + + /** + * Reads the value. + * + * @return Current atomic sequence value. + */ + public long get() { + return atomicSeq.get(); + } + + /** + * Increments and reads the value. + * + * @return Current atomic sequence value. + */ + public long incrementAndGet() { + return atomicSeq.incrementAndGet(); + } + + /** + * Reads and increments the value. + * + * @return Original atomic sequence value. + */ + public long getAndIncrement() { + return atomicSeq.getAndIncrement(); + } + + /** + * Adds a value. + * + * @return Current atomic sequence value. + */ + public long addAndGet(long l) { + return atomicSeq.addAndGet(l); + } + + /** + * Adds a value. + * + * @return Original atomic sequence value. + */ + public long getAndAdd(long l) { + return atomicSeq.getAndAdd(l); + } + + /** + * Gets the batch size. + * + * @return Batch size. + */ + public int getBatchSize() { + return atomicSeq.batchSize(); + } + + /** + * Sets the batch size. + * + * @param size Batch size. + */ + public void setBatchSize(int size) { + atomicSeq.batchSize(size); + } + + /** + * Gets status of atomic. + * + * @return {@code true} if atomic was removed from cache, {@code false} in other case. + */ + public boolean isClosed() { + return atomicSeq.removed(); + } + + /** + * Removes this atomic. + */ + public void close() { + atomicSeq.close(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/cpp/common/include/ignite/common/exports.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/common/exports.h b/modules/platforms/cpp/common/include/ignite/common/exports.h index 66f918f..15911a6 100644 --- a/modules/platforms/cpp/common/include/ignite/common/exports.h +++ b/modules/platforms/cpp/common/include/ignite/common/exports.h @@ -48,6 +48,8 @@ extern "C" { void* IGNITE_CALL IgniteProcessorServices(gcj::JniContext* ctx, void* obj, void* prj); void* IGNITE_CALL IgniteProcessorExtensions(gcj::JniContext* ctx, void* obj); void* IGNITE_CALL IgniteProcessorAtomicLong(gcj::JniContext* ctx, void* obj, char* name, long long initVal, bool create); + void* IGNITE_CALL IgniteProcessorAtomicSequence(gcj::JniContext* ctx, void* obj, char* name, long long initVal, bool create); + void* IGNITE_CALL IgniteProcessorAtomicReference(gcj::JniContext* ctx, void* obj, char* name, long long memPtr, bool create); void IGNITE_CALL IgniteProcessorGetIgniteConfiguration(gcj::JniContext* ctx, void* obj, long memPtr); long long IGNITE_CALL IgniteTargetInStreamOutLong(gcj::JniContext* ctx, void* obj, int opType, long long memPtr); @@ -160,6 +162,19 @@ extern "C" { bool IGNITE_CALL IgniteAtomicLongIsClosed(gcj::JniContext* ctx, void* obj); void IGNITE_CALL IgniteAtomicLongClose(gcj::JniContext* ctx, void* obj); + long long IGNITE_CALL IgniteAtomicSequenceGet(gcj::JniContext* ctx, void* obj); + long long IGNITE_CALL IgniteAtomicSequenceIncrementAndGet(gcj::JniContext* ctx, void* obj); + long long IGNITE_CALL IgniteAtomicSequenceGetAndIncrement(gcj::JniContext* ctx, void* obj); + long long IGNITE_CALL IgniteAtomicSequenceAddAndGet(gcj::JniContext* ctx, void* obj, long long l); + long long IGNITE_CALL IgniteAtomicSequenceGetAndAdd(gcj::JniContext* ctx, void* obj, long long l); + int IGNITE_CALL IgniteAtomicSequenceGetBatchSize(gcj::JniContext* ctx, void* obj); + void IGNITE_CALL IgniteAtomicSequenceSetBatchSize(gcj::JniContext* ctx, void* obj, int size); + bool IGNITE_CALL IgniteAtomicSequenceIsClosed(gcj::JniContext* ctx, void* obj); + void IGNITE_CALL IgniteAtomicSequenceClose(gcj::JniContext* ctx, void* obj); + + bool IGNITE_CALL IgniteAtomicReferenceIsClosed(gcj::JniContext* ctx, void* obj); + void IGNITE_CALL IgniteAtomicReferenceClose(gcj::JniContext* ctx, void* obj); + bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj); bool IGNITE_CALL IgniteListenableIsCancelled(gcj::JniContext* ctx, void* obj); } http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/cpp/common/include/ignite/common/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/include/ignite/common/java.h b/modules/platforms/cpp/common/include/ignite/common/java.h index 072a8ef..8f5823e 100644 --- a/modules/platforms/cpp/common/include/ignite/common/java.h +++ b/modules/platforms/cpp/common/include/ignite/common/java.h @@ -311,6 +311,8 @@ namespace ignite jmethodID m_PlatformProcessor_extensions; jmethodID m_PlatformProcessor_atomicLong; jmethodID m_PlatformProcessor_getIgniteConfiguration; + jmethodID m_PlatformProcessor_atomicSequence; + jmethodID m_PlatformProcessor_atomicReference; jclass c_PlatformTarget; jmethodID m_PlatformTarget_inStreamOutLong; @@ -353,6 +355,21 @@ namespace ignite jmethodID m_PlatformAtomicLong_isClosed; jmethodID m_PlatformAtomicLong_close; + jclass c_PlatformAtomicSequence; + jmethodID m_PlatformAtomicSequence_get; + jmethodID m_PlatformAtomicSequence_incrementAndGet; + jmethodID m_PlatformAtomicSequence_getAndIncrement; + jmethodID m_PlatformAtomicSequence_addAndGet; + jmethodID m_PlatformAtomicSequence_getAndAdd; + jmethodID m_PlatformAtomicSequence_getBatchSize; + jmethodID m_PlatformAtomicSequence_setBatchSize; + jmethodID m_PlatformAtomicSequence_isClosed; + jmethodID m_PlatformAtomicSequence_close; + + jclass c_PlatformAtomicReference; + jmethodID m_PlatformAtomicReference_isClosed; + jmethodID m_PlatformAtomicReference_close; + jclass c_PlatformListenable; jmethodID m_PlatformListenable_cancel; jmethodID m_PlatformListenable_isCancelled; @@ -507,6 +524,8 @@ namespace ignite jobject ProcessorServices(jobject obj, jobject prj); jobject ProcessorExtensions(jobject obj); jobject ProcessorAtomicLong(jobject obj, char* name, long long initVal, bool create); + jobject ProcessorAtomicSequence(jobject obj, char* name, long long initVal, bool create); + jobject ProcessorAtomicReference(jobject obj, char* name, long long memPtr, bool create); void ProcessorGetIgniteConfiguration(jobject obj, long memPtr); long long TargetInStreamOutLong(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); @@ -608,6 +627,19 @@ namespace ignite bool AtomicLongIsClosed(jobject obj); void AtomicLongClose(jobject obj); + long long AtomicSequenceGet(jobject obj); + long long AtomicSequenceIncrementAndGet(jobject obj); + long long AtomicSequenceGetAndIncrement(jobject obj); + long long AtomicSequenceAddAndGet(jobject obj, long long l); + long long AtomicSequenceGetAndAdd(jobject obj, long long l); + int AtomicSequenceGetBatchSize(jobject obj); + void AtomicSequenceSetBatchSize(jobject obj, int size); + bool AtomicSequenceIsClosed(jobject obj); + void AtomicSequenceClose(jobject obj); + + bool AtomicReferenceIsClosed(jobject obj); + void AtomicReferenceClose(jobject obj); + bool ListenableCancel(jobject obj); bool ListenableIsCancelled(jobject obj); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/cpp/common/project/vs/module.def ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/project/vs/module.def b/modules/platforms/cpp/common/project/vs/module.def index 81df027..21a4994 100644 --- a/modules/platforms/cpp/common/project/vs/module.def +++ b/modules/platforms/cpp/common/project/vs/module.def @@ -116,4 +116,17 @@ IgniteTargetListenFutureForOperationAndGet @113 IgniteProcessorCreateCacheFromConfig @114 IgniteProcessorGetOrCreateCacheFromConfig @115 IgniteProcessorGetIgniteConfiguration @116 -IgniteProcessorDestroyCache @117 \ No newline at end of file +IgniteProcessorDestroyCache @117 +IgniteProcessorAtomicSequence @118 +IgniteAtomicSequenceGet @119 +IgniteAtomicSequenceIncrementAndGet @120 +IgniteAtomicSequenceGetAndIncrement @121 +IgniteAtomicSequenceAddAndGet @122 +IgniteAtomicSequenceGetAndAdd @123 +IgniteAtomicSequenceGetBatchSize @124 +IgniteAtomicSequenceSetBatchSize @125 +IgniteAtomicSequenceIsClosed @126 +IgniteAtomicSequenceClose @127 +IgniteProcessorAtomicReference @128 +IgniteAtomicReferenceIsClosed @129 +IgniteAtomicReferenceClose @130 http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/cpp/common/src/exports.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/src/exports.cpp b/modules/platforms/cpp/common/src/exports.cpp index e9ec519..fff2a16 100644 --- a/modules/platforms/cpp/common/src/exports.cpp +++ b/modules/platforms/cpp/common/src/exports.cpp @@ -114,6 +114,14 @@ extern "C" { return ctx->ProcessorAtomicLong(static_cast<jobject>(obj), name, initVal, create); } + void* IGNITE_CALL IgniteProcessorAtomicSequence(gcj::JniContext* ctx, void* obj, char* name, long long initVal, bool create) { + return ctx->ProcessorAtomicSequence(static_cast<jobject>(obj), name, initVal, create); + } + + void* IGNITE_CALL IgniteProcessorAtomicReference(gcj::JniContext* ctx, void* obj, char* name, long long memPtr, bool create) { + return ctx->ProcessorAtomicReference(static_cast<jobject>(obj), name, memPtr, create); + } + void IGNITE_CALL IgniteProcessorGetIgniteConfiguration(gcj::JniContext* ctx, void* obj, long memPtr) { return ctx->ProcessorGetIgniteConfiguration(static_cast<jobject>(obj), memPtr); } @@ -482,6 +490,50 @@ extern "C" { void IGNITE_CALL IgniteAtomicLongClose(gcj::JniContext* ctx, void* obj) { return ctx->AtomicLongClose(static_cast<jobject>(obj)); } + + long long IGNITE_CALL IgniteAtomicSequenceGet(gcj::JniContext* ctx, void* obj) { + return ctx->AtomicSequenceGet(static_cast<jobject>(obj)); + } + + long long IGNITE_CALL IgniteAtomicSequenceIncrementAndGet(gcj::JniContext* ctx, void* obj) { + return ctx->AtomicSequenceIncrementAndGet(static_cast<jobject>(obj)); + } + + long long IGNITE_CALL IgniteAtomicSequenceGetAndIncrement(gcj::JniContext* ctx, void* obj) { + return ctx->AtomicSequenceGetAndIncrement(static_cast<jobject>(obj)); + } + + long long IGNITE_CALL IgniteAtomicSequenceAddAndGet(gcj::JniContext* ctx, void* obj, long long l) { + return ctx->AtomicSequenceAddAndGet(static_cast<jobject>(obj), l); + } + + long long IGNITE_CALL IgniteAtomicSequenceGetAndAdd(gcj::JniContext* ctx, void* obj, long long l) { + return ctx->AtomicSequenceGetAndAdd(static_cast<jobject>(obj), l); + } + + int IGNITE_CALL IgniteAtomicSequenceGetBatchSize(gcj::JniContext* ctx, void* obj) { + return ctx->AtomicSequenceGetBatchSize(static_cast<jobject>(obj)); + } + + void IGNITE_CALL IgniteAtomicSequenceSetBatchSize(gcj::JniContext* ctx, void* obj, int size) { + return ctx->AtomicSequenceSetBatchSize(static_cast<jobject>(obj), size); + } + + bool IGNITE_CALL IgniteAtomicSequenceIsClosed(gcj::JniContext* ctx, void* obj) { + return ctx->AtomicSequenceIsClosed(static_cast<jobject>(obj)); + } + + void IGNITE_CALL IgniteAtomicSequenceClose(gcj::JniContext* ctx, void* obj) { + return ctx->AtomicSequenceClose(static_cast<jobject>(obj)); + } + + bool IGNITE_CALL IgniteAtomicReferenceIsClosed(gcj::JniContext* ctx, void* obj) { + return ctx->AtomicReferenceIsClosed(static_cast<jobject>(obj)); + } + + void IGNITE_CALL IgniteAtomicReferenceClose(gcj::JniContext* ctx, void* obj) { + ctx->AtomicReferenceClose(static_cast<jobject>(obj)); + } bool IGNITE_CALL IgniteListenableCancel(gcj::JniContext* ctx, void* obj) { return ctx->ListenableCancel(static_cast<jobject>(obj)); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/cpp/common/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/common/src/java.cpp b/modules/platforms/cpp/common/src/java.cpp index e36c1e0..d6f7ef0 100644 --- a/modules/platforms/cpp/common/src/java.cpp +++ b/modules/platforms/cpp/common/src/java.cpp @@ -203,6 +203,8 @@ namespace ignite JniMethod M_PLATFORM_PROCESSOR_SERVICES = JniMethod("services", "(Lorg/apache/ignite/internal/processors/platform/PlatformTarget;)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); JniMethod M_PLATFORM_PROCESSOR_EXTENSIONS = JniMethod("extensions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); JniMethod M_PLATFORM_PROCESSOR_ATOMIC_LONG = JniMethod("atomicLong", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); + JniMethod M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE = JniMethod("atomicSequence", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); + JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); JniMethod M_PLATFORM_PROCESSOR_GET_IGNITE_CONFIGURATION = JniMethod("getIgniteConfiguration", "(J)V", false); const char* C_PLATFORM_TARGET = "org/apache/ignite/internal/processors/platform/PlatformTarget"; @@ -396,6 +398,21 @@ namespace ignite JniMethod M_PLATFORM_ATOMIC_LONG_IS_CLOSED = JniMethod("isClosed", "()Z", false); JniMethod M_PLATFORM_ATOMIC_LONG_CLOSE = JniMethod("close", "()V", false); + const char* C_PLATFORM_ATOMIC_SEQUENCE = "org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicSequence"; + JniMethod M_PLATFORM_ATOMIC_SEQUENCE_GET = JniMethod("get", "()J", false); + JniMethod M_PLATFORM_ATOMIC_SEQUENCE_INCREMENT_AND_GET = JniMethod("incrementAndGet", "()J", false); + JniMethod M_PLATFORM_ATOMIC_SEQUENCE_GET_AND_INCREMENT = JniMethod("getAndIncrement", "()J", false); + JniMethod M_PLATFORM_ATOMIC_SEQUENCE_ADD_AND_GET = JniMethod("addAndGet", "(J)J", false); + JniMethod M_PLATFORM_ATOMIC_SEQUENCE_GET_AND_ADD = JniMethod("getAndAdd", "(J)J", false); + JniMethod M_PLATFORM_ATOMIC_SEQUENCE_GET_BATCH_SIZE = JniMethod("getBatchSize", "()I", false); + JniMethod M_PLATFORM_ATOMIC_SEQUENCE_SET_BATCH_SIZE = JniMethod("setBatchSize", "(I)V", false); + JniMethod M_PLATFORM_ATOMIC_SEQUENCE_IS_CLOSED = JniMethod("isClosed", "()Z", false); + JniMethod M_PLATFORM_ATOMIC_SEQUENCE_CLOSE = JniMethod("close", "()V", false); + + const char* C_PLATFORM_ATOMIC_REFERENCE = "org/apache/ignite/internal/processors/platform/datastructures/PlatformAtomicReference"; + JniMethod M_PLATFORM_ATOMIC_REFERENCE_IS_CLOSED = JniMethod("isClosed", "()Z", false); + JniMethod M_PLATFORM_ATOMIC_REFERENCE_CLOSE = JniMethod("close", "()V", false); + const char* C_PLATFORM_LISTENABLE = "org/apache/ignite/internal/processors/platform/utils/PlatformListenable"; JniMethod M_PLATFORM_LISTENABLE_CANCEL = JniMethod("cancel", "()Z", false); JniMethod M_PLATFORM_LISTENABLE_IS_CANCELED = JniMethod("isCancelled", "()Z", false); @@ -652,6 +669,8 @@ namespace ignite m_PlatformProcessor_services = FindMethod(env, c_PlatformProcessor, M_PLATFORM_PROCESSOR_SERVICES); m_PlatformProcessor_extensions = FindMethod(env, c_PlatformProcessor, M_PLATFORM_PROCESSOR_EXTENSIONS); m_PlatformProcessor_atomicLong = FindMethod(env, c_PlatformProcessor, M_PLATFORM_PROCESSOR_ATOMIC_LONG); + m_PlatformProcessor_atomicSequence = FindMethod(env, c_PlatformProcessor, M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE); + m_PlatformProcessor_atomicReference = FindMethod(env, c_PlatformProcessor, M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE); m_PlatformProcessor_getIgniteConfiguration = FindMethod(env, c_PlatformProcessor, M_PLATFORM_PROCESSOR_GET_IGNITE_CONFIGURATION); c_PlatformTarget = FindClass(env, C_PLATFORM_TARGET); @@ -695,6 +714,21 @@ namespace ignite m_PlatformAtomicLong_isClosed = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_IS_CLOSED); m_PlatformAtomicLong_close = FindMethod(env, c_PlatformAtomicLong, M_PLATFORM_ATOMIC_LONG_CLOSE); + jclass c_PlatformAtomicSequence = FindClass(env, C_PLATFORM_ATOMIC_SEQUENCE); + m_PlatformAtomicSequence_get = FindMethod(env, c_PlatformAtomicSequence, M_PLATFORM_ATOMIC_SEQUENCE_GET); + m_PlatformAtomicSequence_incrementAndGet = FindMethod(env, c_PlatformAtomicSequence, M_PLATFORM_ATOMIC_SEQUENCE_INCREMENT_AND_GET); + m_PlatformAtomicSequence_getAndIncrement = FindMethod(env, c_PlatformAtomicSequence, M_PLATFORM_ATOMIC_SEQUENCE_GET_AND_INCREMENT); + m_PlatformAtomicSequence_addAndGet = FindMethod(env, c_PlatformAtomicSequence, M_PLATFORM_ATOMIC_SEQUENCE_ADD_AND_GET); + m_PlatformAtomicSequence_getAndAdd = FindMethod(env, c_PlatformAtomicSequence, M_PLATFORM_ATOMIC_SEQUENCE_GET_AND_ADD); + m_PlatformAtomicSequence_getBatchSize = FindMethod(env, c_PlatformAtomicSequence, M_PLATFORM_ATOMIC_SEQUENCE_GET_BATCH_SIZE); + m_PlatformAtomicSequence_setBatchSize = FindMethod(env, c_PlatformAtomicSequence, M_PLATFORM_ATOMIC_SEQUENCE_SET_BATCH_SIZE); + m_PlatformAtomicSequence_isClosed = FindMethod(env, c_PlatformAtomicSequence, M_PLATFORM_ATOMIC_SEQUENCE_IS_CLOSED); + m_PlatformAtomicSequence_close = FindMethod(env, c_PlatformAtomicSequence, M_PLATFORM_ATOMIC_SEQUENCE_CLOSE); + + jclass c_PlatformAtomicReference = FindClass(env, C_PLATFORM_ATOMIC_REFERENCE); + m_PlatformAtomicReference_isClosed = FindMethod(env, c_PlatformAtomicReference, M_PLATFORM_ATOMIC_REFERENCE_IS_CLOSED); + m_PlatformAtomicReference_close = FindMethod(env, c_PlatformAtomicReference, M_PLATFORM_ATOMIC_REFERENCE_CLOSE); + c_PlatformListenable = FindClass(env, C_PLATFORM_LISTENABLE); m_PlatformListenable_cancel = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_CANCEL); m_PlatformListenable_isCancelled = FindMethod(env, c_PlatformListenable, M_PLATFORM_LISTENABLE_IS_CANCELED); @@ -1307,6 +1341,38 @@ namespace ignite return LocalToGlobal(env, res); } + jobject JniContext::ProcessorAtomicSequence(jobject obj, char* name, long long initVal, bool create) + { + JNIEnv* env = Attach(); + + jstring name0 = name != NULL ? env->NewStringUTF(name) : NULL; + + jobject res = env->CallObjectMethod(obj, jvm->GetMembers().m_PlatformProcessor_atomicSequence, name0, initVal, create); + + if (name0) + env->DeleteLocalRef(name0); + + ExceptionCheck(env); + + return LocalToGlobal(env, res); + } + + jobject JniContext::ProcessorAtomicReference(jobject obj, char* name, long long memPtr, bool create) + { + JNIEnv* env = Attach(); + + jstring name0 = name != NULL ? env->NewStringUTF(name) : NULL; + + jobject res = env->CallObjectMethod(obj, jvm->GetMembers().m_PlatformProcessor_atomicReference, name0, memPtr, create); + + if (name0) + env->DeleteLocalRef(name0); + + ExceptionCheck(env); + + return LocalToGlobal(env, res); + } + void JniContext::ProcessorGetIgniteConfiguration(jobject obj, long memPtr) { JNIEnv* env = Attach(); @@ -2139,6 +2205,121 @@ namespace ignite ExceptionCheck(env); } + long long JniContext::AtomicSequenceGet(jobject obj) + { + JNIEnv* env = Attach(); + + long long res = env->CallLongMethod(obj, jvm->GetMembers().m_PlatformAtomicSequence_get); + + ExceptionCheck(env); + + return res; + } + + long long JniContext::AtomicSequenceIncrementAndGet(jobject obj) + { + JNIEnv* env = Attach(); + + long long res = env->CallLongMethod(obj, jvm->GetMembers().m_PlatformAtomicSequence_incrementAndGet); + + ExceptionCheck(env); + + return res; + } + + long long JniContext::AtomicSequenceGetAndIncrement(jobject obj) + { + JNIEnv* env = Attach(); + + long long res = env->CallLongMethod(obj, jvm->GetMembers().m_PlatformAtomicSequence_getAndIncrement); + + ExceptionCheck(env); + + return res; + } + + long long JniContext::AtomicSequenceAddAndGet(jobject obj, long long l) + { + JNIEnv* env = Attach(); + + long long res = env->CallLongMethod(obj, jvm->GetMembers().m_PlatformAtomicSequence_addAndGet, l); + + ExceptionCheck(env); + + return res; + } + + long long JniContext::AtomicSequenceGetAndAdd(jobject obj, long long l) + { + JNIEnv* env = Attach(); + + long long res = env->CallLongMethod(obj, jvm->GetMembers().m_PlatformAtomicSequence_getAndAdd, l); + + ExceptionCheck(env); + + return res; + } + + int JniContext::AtomicSequenceGetBatchSize(jobject obj) + { + JNIEnv* env = Attach(); + + int res = env->CallIntMethod(obj, jvm->GetMembers().m_PlatformAtomicSequence_getBatchSize); + + ExceptionCheck(env); + + return res; + } + + void JniContext::AtomicSequenceSetBatchSize(jobject obj, int size) + { + JNIEnv* env = Attach(); + + env->CallVoidMethod(obj, jvm->GetMembers().m_PlatformAtomicSequence_setBatchSize, size); + + ExceptionCheck(env); + } + + bool JniContext::AtomicSequenceIsClosed(jobject obj) + { + JNIEnv* env = Attach(); + + jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformAtomicSequence_isClosed); + + ExceptionCheck(env); + + return res != 0; + } + + void JniContext::AtomicSequenceClose(jobject obj) + { + JNIEnv* env = Attach(); + + env->CallVoidMethod(obj, jvm->GetMembers().m_PlatformAtomicSequence_close); + + ExceptionCheck(env); + } + + bool JniContext::AtomicReferenceIsClosed(jobject obj) + { + JNIEnv* env = Attach(); + + jboolean res = env->CallBooleanMethod(obj, jvm->GetMembers().m_PlatformAtomicReference_isClosed); + + ExceptionCheck(env); + + return res != 0; + } + + void JniContext::AtomicReferenceClose(jobject obj) + { + JNIEnv* env = Attach(); + + env->CallVoidMethod(obj, jvm->GetMembers().m_PlatformAtomicReference_close); + + ExceptionCheck(env); + } + bool JniContext::ListenableCancel(jobject obj) { JNIEnv* env = Attach(); http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index 481adfb..f5e98c5 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -107,6 +107,8 @@ <Compile Include="Compute\TaskResultTest.cs" /> <Compile Include="Dataload\DataStreamerTest.cs" /> <Compile Include="DataStructures\AtomicLongTest.cs" /> + <Compile Include="DataStructures\AtomicReferenceTest.cs" /> + <Compile Include="DataStructures\AtomicSequenceTest.cs" /> <Compile Include="EventsTest.cs" /> <Compile Include="Examples\Example.cs" /> <Compile Include="Examples\ExamplesTest.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core.Tests/DataStructures/AtomicReferenceTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/DataStructures/AtomicReferenceTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/DataStructures/AtomicReferenceTest.cs new file mode 100644 index 0000000..93375da --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/DataStructures/AtomicReferenceTest.cs @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.DataStructures +{ + using System; + using System.Linq; + using Apache.Ignite.Core.Binary; + using NUnit.Framework; + + /// <summary> + /// Atomic reference test. + /// </summary> + public class AtomicReferenceTest : IgniteTestBase + { + /** */ + private const string AtomicRefName = "testAtomicRef"; + + /// <summary> + /// Initializes a new instance of the <see cref="AtomicReferenceTest"/> class. + /// </summary> + public AtomicReferenceTest() : base("config\\compute\\compute-grid1.xml") + { + // No-op. + } + + /** <inheritdoc /> */ + public override void TestSetUp() + { + base.TestSetUp(); + + // Close test atomic if there is any + Grid.GetAtomicReference(AtomicRefName, 0, true).Close(); + } + + /** <inheritdoc /> */ + protected override IgniteConfiguration GetConfiguration(string springConfigUrl) + { + var cfg = base.GetConfiguration(springConfigUrl); + + cfg.BinaryConfiguration = new BinaryConfiguration(typeof(BinaryObj)); + + return cfg; + } + + /// <summary> + /// Tests lifecycle of the AtomicReference. + /// </summary> + [Test] + public void TestCreateClose() + { + Assert.IsNull(Grid.GetAtomicReference(AtomicRefName, 10, false)); + + // Nonexistent atomic returns null + Assert.IsNull(Grid.GetAtomicReference(AtomicRefName, 10, false)); + + // Create new + var al = Grid.GetAtomicReference(AtomicRefName, 10, true); + Assert.AreEqual(AtomicRefName, al.Name); + Assert.AreEqual(10, al.Read()); + Assert.AreEqual(false, al.IsClosed); + + // Get existing with create flag + var al2 = Grid.GetAtomicReference(AtomicRefName, 5, true); + Assert.AreEqual(AtomicRefName, al2.Name); + Assert.AreEqual(10, al2.Read()); + Assert.AreEqual(false, al2.IsClosed); + + // Get existing without create flag + var al3 = Grid.GetAtomicReference(AtomicRefName, 5, false); + Assert.AreEqual(AtomicRefName, al3.Name); + Assert.AreEqual(10, al3.Read()); + Assert.AreEqual(false, al3.IsClosed); + + al.Close(); + + Assert.AreEqual(true, al.IsClosed); + Assert.AreEqual(true, al2.IsClosed); + Assert.AreEqual(true, al3.IsClosed); + + Assert.IsNull(Grid.GetAtomicReference(AtomicRefName, 10, false)); + } + + /// <summary> + /// Tests modification methods. + /// </summary> + [Test] + public void TestModify() + { + var atomics = Enumerable.Range(1, 10) + .Select(x => Grid.GetAtomicReference(AtomicRefName, 5, true)).ToList(); + + atomics.ForEach(x => Assert.AreEqual(5, x.Read())); + + atomics[0].Write(15); + atomics.ForEach(x => Assert.AreEqual(15, x.Read())); + + Assert.AreEqual(15, atomics[0].CompareExchange(42, 15)); + atomics.ForEach(x => Assert.AreEqual(42, x.Read())); + } + + /// <summary> + /// Tests primitives in the atomic. + /// </summary> + [Test] + public void TestPrimitives() + { + TestOperations(1, 2); + TestOperations("1", "2"); + TestOperations(Guid.NewGuid(), Guid.NewGuid()); + } + + /// <summary> + /// Tests DateTime in the atomic. + /// </summary> + [Test] + [Ignore("IGNITE-2578")] + public void TestDateTime() + { + TestOperations(DateTime.Now, DateTime.Now.AddDays(-1)); + } + + /// <summary> + /// Tests serializable objects in the atomic. + /// </summary> + [Test] + [Ignore("IGNITE-2578")] + public void TestSerializable() + { + TestOperations(new SerializableObj {Foo = 16}, new SerializableObj {Foo = -5}); + } + + /// <summary> + /// Tests binarizable objects in the atomic. + /// </summary> + [Test] + public void TestBinarizable() + { + TestOperations(new BinaryObj {Foo = 16}, new BinaryObj {Foo = -5}); + } + + /// <summary> + /// Tests operations on specific object. + /// </summary> + /// <typeparam name="T"></typeparam> + /// <param name="x">The x.</param> + /// <param name="y">The y.</param> + private void TestOperations<T>(T x, T y) + { + Grid.GetAtomicReference(AtomicRefName, 0, true).Close(); + + var atomic = Grid.GetAtomicReference(AtomicRefName, x, true); + + Assert.AreEqual(x, atomic.Read()); + + atomic.Write(y); + Assert.AreEqual(y, atomic.Read()); + + var old = atomic.CompareExchange(x, y); + Assert.AreEqual(y, old); + Assert.AreEqual(x, atomic.Read()); + + old = atomic.CompareExchange(x, y); + Assert.AreEqual(x, old); + Assert.AreEqual(x, atomic.Read()); + + // Check nulls + var nul = default(T); + + old = atomic.CompareExchange(nul, x); + Assert.AreEqual(x, old); + Assert.AreEqual(nul, atomic.Read()); + + old = atomic.CompareExchange(y, nul); + Assert.AreEqual(nul, old); + Assert.AreEqual(y, atomic.Read()); + } + + /// <summary> + /// Serializable. + /// </summary> + [Serializable] + private class SerializableObj + { + /** */ + public int Foo { get; set; } + + /** <inheritdoc /> */ + private bool Equals(SerializableObj other) + { + return Foo == other.Foo; + } + + /** <inheritdoc /> */ + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + if (ReferenceEquals(this, obj)) return true; + if (obj.GetType() != GetType()) return false; + return Equals((SerializableObj) obj); + } + + /** <inheritdoc /> */ + public override int GetHashCode() + { + // ReSharper disable once NonReadonlyMemberInGetHashCode + return Foo; + } + + /** <inheritdoc /> */ + public override string ToString() + { + return base.ToString() + "[" + Foo + "]"; + } + } + + /// <summary> + /// Binary. + /// </summary> + private sealed class BinaryObj : SerializableObj + { + // No-op. + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core.Tests/DataStructures/AtomicSequenceTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/DataStructures/AtomicSequenceTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/DataStructures/AtomicSequenceTest.cs new file mode 100644 index 0000000..472dee2 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/DataStructures/AtomicSequenceTest.cs @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Tests.DataStructures +{ + using System.Linq; + using NUnit.Framework; + + /// <summary> + /// Atomic sequence test. + /// </summary> + public class AtomicSequenceTest : IgniteTestBase + { + /** */ + private const string AtomicSeqName = "testAtomicSeq"; + + /// <summary> + /// Initializes a new instance of the <see cref="AtomicSequenceTest"/> class. + /// </summary> + public AtomicSequenceTest() : base("config\\compute\\compute-grid1.xml") + { + // No-op. + } + + /** <inheritdoc /> */ + public override void TestSetUp() + { + base.TestSetUp(); + + // Close test atomic if there is any + Grid.GetAtomicSequence(AtomicSeqName, 0, true).Close(); + } + + /// <summary> + /// Tests lifecycle of the AtomicSequence. + /// </summary> + [Test] + public void TestCreateClose() + { + Assert.IsNull(Grid.GetAtomicSequence(AtomicSeqName, 10, false)); + + // Nonexistent atomic returns null + Assert.IsNull(Grid.GetAtomicSequence(AtomicSeqName, 10, false)); + + // Create new + var al = Grid.GetAtomicSequence(AtomicSeqName, 10, true); + Assert.AreEqual(AtomicSeqName, al.Name); + Assert.AreEqual(10, al.Read()); + Assert.AreEqual(false, al.IsClosed); + + // Get existing with create flag + var al2 = Grid.GetAtomicSequence(AtomicSeqName, 5, true); + Assert.AreEqual(AtomicSeqName, al2.Name); + Assert.AreEqual(10, al2.Read()); + Assert.AreEqual(false, al2.IsClosed); + + // Get existing without create flag + var al3 = Grid.GetAtomicSequence(AtomicSeqName, 5, false); + Assert.AreEqual(AtomicSeqName, al3.Name); + Assert.AreEqual(10, al3.Read()); + Assert.AreEqual(false, al3.IsClosed); + + al.Close(); + + Assert.AreEqual(true, al.IsClosed); + Assert.AreEqual(true, al2.IsClosed); + Assert.AreEqual(true, al3.IsClosed); + + Assert.IsNull(Grid.GetAtomicSequence(AtomicSeqName, 10, false)); + } + + /// <summary> + /// Tests modification methods. + /// </summary> + [Test] + public void TestModify() + { + var atomics = Enumerable.Range(1, 10) + .Select(x => Grid.GetAtomicSequence(AtomicSeqName, 5, true)).ToList(); + + atomics.ForEach(x => Assert.AreEqual(5, x.Read())); + + Assert.AreEqual(10, atomics[0].Add(5)); + atomics.ForEach(x => Assert.AreEqual(10, x.Read())); + + Assert.AreEqual(11, atomics[0].Increment()); + atomics.ForEach(x => Assert.AreEqual(11, x.Read())); + + atomics.ForEach(x => x.BatchSize = 42); + atomics.ForEach(x => Assert.AreEqual(42, x.BatchSize)); + } + + /// <summary> + /// Tests multithreaded scenario. + /// </summary> + [Test] + public void TestMultithreaded() + { + const int atomicCnt = 10; + const int threadCnt = 5; + const int iterations = 3000; + + // 10 atomics with same name + var atomics = Enumerable.Range(1, atomicCnt) + .Select(x => Grid.GetAtomicSequence(AtomicSeqName, 0, true)).ToList(); + + // 5 threads increment 30000 times + TestUtils.RunMultiThreaded(() => + { + for (var i = 0; i < iterations; i++) + atomics.ForEach(x => x.Increment()); + }, threadCnt); + + atomics.ForEach(x => Assert.AreEqual(atomicCnt*threadCnt*iterations, x.Read())); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 1c83168..e2efd0a 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -161,6 +161,8 @@ <Compile Include="Datastream\StreamTransformer.cs" /> <Compile Include="Datastream\StreamVisitor.cs" /> <Compile Include="DataStructures\IAtomicLong.cs" /> + <Compile Include="DataStructures\IAtomicReference.cs" /> + <Compile Include="DataStructures\IAtomicSequence.cs" /> <Compile Include="DataStructures\Package-Info.cs" /> <Compile Include="Events\CacheEvent.cs" /> <Compile Include="Events\CacheQueryExecutedEvent.cs" /> @@ -259,6 +261,8 @@ <Compile Include="Impl\Datastream\DataStreamerRemoveEntry.cs" /> <Compile Include="Impl\Datastream\StreamReceiverHolder.cs" /> <Compile Include="Impl\DataStructures\AtomicLong.cs" /> + <Compile Include="Impl\DataStructures\AtomicReference.cs" /> + <Compile Include="Impl\DataStructures\AtomicSequence.cs" /> <Compile Include="Impl\Events\Events.cs" /> <Compile Include="Impl\Events\RemoteListenEventFilter.cs" /> <Compile Include="Impl\ExceptionUtils.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core/DataStructures/IAtomicReference.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/DataStructures/IAtomicReference.cs b/modules/platforms/dotnet/Apache.Ignite.Core/DataStructures/IAtomicReference.cs new file mode 100644 index 0000000..403c0ca --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/DataStructures/IAtomicReference.cs @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.DataStructures +{ + /// <summary> + /// Represents a named value in the distributed cache. + /// </summary> + public interface IAtomicReference<T> + { + /// <summary> + /// Gets the name of this atomic reference. + /// </summary> + /// <value> + /// Name of this atomic reference. + /// </value> + string Name { get; } + + /// <summary> + /// Reads current value of an atomic reference. + /// </summary> + /// <returns>Current value of an atomic reference.</returns> + T Read(); + + /// <summary> + /// Writes current value of an atomic reference. + /// </summary> + /// <param name="value">The value to set.</param> + void Write(T value); + + /// <summary> + /// Compares current value with specified value for equality and, if they are equal, replaces current value. + /// </summary> + /// <param name="value">The value to set.</param> + /// <param name="comparand">The value that is compared to the current value.</param> + /// <returns>Original value of the atomic reference.</returns> + T CompareExchange(T value, T comparand); + + /// <summary> + /// Determines whether this instance was removed from cache. + /// </summary> + /// <returns>True if this atomic was removed from cache; otherwise, false.</returns> + bool IsClosed { get; } + + /// <summary> + /// Closes this instance. + /// </summary> + void Close(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core/DataStructures/IAtomicSequence.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/DataStructures/IAtomicSequence.cs b/modules/platforms/dotnet/Apache.Ignite.Core/DataStructures/IAtomicSequence.cs new file mode 100644 index 0000000..f5b1dad --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/DataStructures/IAtomicSequence.cs @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.DataStructures +{ + /// <summary> + /// Represents a distributed atomic sequence of numbers. + /// </summary> + public interface IAtomicSequence + { + /// <summary> + /// Gets the name of this atomic sequence. + /// </summary> + /// <value> + /// Name of this atomic sequence. + /// </value> + string Name { get; } + + /// <summary> + /// Returns current value. + /// </summary> + /// <returns>Current value of the atomic sequence.</returns> + long Read(); + + /// <summary> + /// Increments current value and returns result. + /// </summary> + /// <returns>The new value of the atomic sequence.</returns> + long Increment(); + + /// <summary> + /// Adds specified value to the current value and returns result. + /// </summary> + /// <param name="value">The value to add.</param> + /// <returns>The new value of the atomic sequence.</returns> + long Add(long value); + + /// <summary> + /// Gets local batch size for this atomic sequence. + /// </summary> + /// <returns>Sequence batch size.</returns> + int BatchSize { get; set; } + + /// <summary> + /// Determines whether this instance was removed from cache. + /// </summary> + /// <returns>True if this atomic was removed from cache; otherwise, false.</returns> + bool IsClosed { get; } + + /// <summary> + /// Closes this instance. + /// </summary> + void Close(); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs index d18e790..12ea09e 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/IIgnite.cs @@ -192,6 +192,34 @@ namespace Apache.Ignite.Core IAtomicLong GetAtomicLong(string name, long initialValue, bool create); /// <summary> + /// Gets an atomic sequence with specified name from cache. + /// Creates new atomic sequence in cache if it does not exist and <paramref name="create"/> is true. + /// </summary> + /// <param name="name">Name of the atomic sequence.</param> + /// <param name="initialValue"> + /// Initial value for the atomic sequence. Ignored if <paramref name="create"/> is false. + /// </param> + /// <param name="create">Flag indicating whether atomic sequence should be created if it does not exist.</param> + /// <returns>Atomic sequence instance with specified name, + /// or null if it does not exist and <paramref name="create"/> flag is not set.</returns> + /// <exception cref="IgniteException">If atomic sequence could not be fetched or created.</exception> + IAtomicSequence GetAtomicSequence(string name, long initialValue, bool create); + + /// <summary> + /// Gets an atomic reference with specified name from cache. + /// Creates new atomic reference in cache if it does not exist and <paramref name="create"/> is true. + /// </summary> + /// <param name="name">Name of the atomic reference.</param> + /// <param name="initialValue"> + /// Initial value for the atomic reference. Ignored if <paramref name="create"/> is false. + /// </param> + /// <param name="create">Flag indicating whether atomic reference should be created if it does not exist.</param> + /// <returns>Atomic reference instance with specified name, + /// or null if it does not exist and <paramref name="create"/> flag is not set.</returns> + /// <exception cref="IgniteException">If atomic reference could not be fetched or created.</exception> + IAtomicReference<T> GetAtomicReference<T>(string name, T initialValue, bool create); + + /// <summary> /// Gets the configuration of this Ignite instance. /// </summary> [SuppressMessage("Microsoft.Design", "CA1024:UsePropertiesWhereAppropriate", Justification = "Semantics.")] http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs new file mode 100644 index 0000000..e871412 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicReference.cs @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.DataStructures +{ + using System.Diagnostics; + using Apache.Ignite.Core.DataStructures; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Unmanaged; + + /// <summary> + /// Atomic reference. + /// </summary> + internal class AtomicReference<T> : PlatformTarget, IAtomicReference<T> + { + /** Opcodes. */ + private enum Op + { + Get = 1, + Set = 2, + CompareAndSetAndGet = 3 + } + + /** */ + private readonly string _name; + + /** <inheritDoc /> */ + public AtomicReference(IUnmanagedTarget target, Marshaller marsh, string name) + : base(target, marsh) + { + Debug.Assert(!string.IsNullOrEmpty(name)); + + _name = name; + } + + /** <inheritDoc /> */ + public string Name + { + get { return _name; } + } + + /** <inheritDoc /> */ + public T Read() + { + return DoInOp<T>((int) Op.Get); + } + + /** <inheritDoc /> */ + public void Write(T value) + { + DoOutOp((int) Op.Set, value); + } + + /** <inheritDoc /> */ + public T CompareExchange(T value, T comparand) + { + return DoOutInOp((int) Op.CompareAndSetAndGet, + writer => + { + writer.WriteObject(value); + writer.WriteObject(comparand); + }, + stream => Marshaller.StartUnmarshal(stream).Deserialize<T>()); + } + + /** <inheritDoc /> */ + public bool IsClosed + { + get { return UnmanagedUtils.AtomicReferenceIsClosed(Target); } + } + + /** <inheritDoc /> */ + public void Close() + { + UnmanagedUtils.AtomicReferenceClose(Target); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs new file mode 100644 index 0000000..0835b9a --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/DataStructures/AtomicSequence.cs @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Impl.DataStructures +{ + using System.Diagnostics; + using Apache.Ignite.Core.DataStructures; + using Apache.Ignite.Core.Impl.Binary; + using Apache.Ignite.Core.Impl.Unmanaged; + + /// <summary> + /// Atomic long wrapper. + /// </summary> + internal sealed class AtomicSequence: PlatformTarget, IAtomicSequence + { + /** */ + private readonly string _name; + + /// <summary> + /// Initializes a new instance of the <see cref="Apache.Ignite.Core.Impl.DataStructures.AtomicLong"/> class. + /// </summary> + /// <param name="target">The target.</param> + /// <param name="marsh">The marshaller.</param> + /// <param name="name">The name.</param> + public AtomicSequence(IUnmanagedTarget target, Marshaller marsh, string name) + : base(target, marsh) + { + Debug.Assert(!string.IsNullOrEmpty(name)); + + _name = name; + } + + /** <inheritDoc /> */ + public string Name + { + get { return _name; } + } + + /** <inheritDoc /> */ + public long Read() + { + return UnmanagedUtils.AtomicSequenceGet(Target); + } + + /** <inheritDoc /> */ + public long Increment() + { + return UnmanagedUtils.AtomicSequenceIncrementAndGet(Target); + } + + /** <inheritDoc /> */ + public long Add(long value) + { + return UnmanagedUtils.AtomicSequenceAddAndGet(Target, value); + } + + /** <inheritDoc /> */ + public int BatchSize + { + get { return UnmanagedUtils.AtomicSequenceGetBatchSize(Target); } + set { UnmanagedUtils.AtomicSequenceSetBatchSize(Target, value); } + } + + /** <inheritDoc /> */ + public bool IsClosed + { + get { return UnmanagedUtils.AtomicSequenceIsClosed(Target); } + } + + /** <inheritDoc /> */ + public void Close() + { + UnmanagedUtils.AtomicSequenceClose(Target); + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs index 9d27117..be21d7f 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Ignite.cs @@ -486,6 +486,56 @@ namespace Apache.Ignite.Core.Impl } /** <inheritdoc /> */ + public IAtomicSequence GetAtomicSequence(string name, long initialValue, bool create) + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + + var nativeSeq = UU.ProcessorAtomicSequence(_proc, name, initialValue, create); + + if (nativeSeq == null) + return null; + + return new AtomicSequence(nativeSeq, Marshaller, name); + } + + /** <inheritdoc /> */ + public IAtomicReference<T> GetAtomicReference<T>(string name, T initialValue, bool create) + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + + var refTarget = GetAtomicReferenceUnmanaged(name, initialValue, create); + + return refTarget == null ? null : new AtomicReference<T>(refTarget, Marshaller, name); + } + + /// <summary> + /// Gets the unmanaged atomic reference. + /// </summary> + /// <param name="name">The name.</param> + /// <param name="initialValue">The initial value.</param> + /// <param name="create">Create flag.</param> + /// <returns>Unmanaged atomic reference, or null.</returns> + private IUnmanagedTarget GetAtomicReferenceUnmanaged<T>(string name, T initialValue, bool create) + { + IgniteArgumentCheck.NotNullOrEmpty(name, "name"); + + // Do not allocate memory when default is not used. + if (!create) + return UU.ProcessorAtomicReference(_proc, name, 0, false); + + using (var stream = IgniteManager.Memory.Allocate().GetStream()) + { + var writer = Marshaller.StartMarshal(stream); + + writer.Write(initialValue); + + var memPtr = stream.SynchronizeOutput(); + + return UU.ProcessorAtomicReference(_proc, name, memPtr, true); + } + } + + /** <inheritdoc /> */ public IgniteConfiguration GetConfiguration() { using (var stream = IgniteManager.Memory.Allocate(1024).GetStream()) http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteProxy.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteProxy.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteProxy.cs index 46bc3ca..a303783 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteProxy.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/IgniteProxy.cs @@ -340,6 +340,18 @@ namespace Apache.Ignite.Core.Impl } /** <inheritdoc /> */ + public IAtomicSequence GetAtomicSequence(string name, long initialValue, bool create) + { + return _ignite.GetAtomicSequence(name, initialValue, create); + } + + /** <inheritdoc /> */ + public IAtomicReference<T> GetAtomicReference<T>(string name, T initialValue, bool create) + { + return _ignite.GetAtomicReference(name, initialValue, create); + } + + /** <inheritdoc /> */ public void WriteBinary(IBinaryWriter writer) { // No-op. http://git-wip-us.apache.org/repos/asf/ignite/blob/f7c1296c/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs index 17df94a..28eb208 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Impl/Unmanaged/IgniteJniNativeMethods.cs @@ -93,6 +93,14 @@ namespace Apache.Ignite.Core.Impl.Unmanaged public static extern void* ProcessorAtomicLong(void* ctx, void* obj, sbyte* name, long initVal, [MarshalAs(UnmanagedType.U1)] bool create); + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteProcessorAtomicSequence")] + public static extern void* ProcessorAtomicSequence(void* ctx, void* obj, sbyte* name, long initVal, + [MarshalAs(UnmanagedType.U1)] bool create); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteProcessorAtomicReference")] + public static extern void* ProcessorAtomicReference(void* ctx, void* obj, sbyte* name, long memPtr, + [MarshalAs(UnmanagedType.U1)] bool create); + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteProcessorGetIgniteConfiguration")] public static extern void ProcessorGetIgniteConfiguration(void* ctx, void* obj, long memPtr); @@ -373,6 +381,35 @@ namespace Apache.Ignite.Core.Impl.Unmanaged [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicLongClose")] public static extern void AtomicLongClose(void* ctx, void* target); + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicSequenceGet")] + public static extern long AtomicSequenceGet(void* ctx, void* target); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicSequenceIncrementAndGet")] + public static extern long AtomicSequenceIncrementAndGet(void* ctx, void* target); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicSequenceAddAndGet")] + public static extern long AtomicSequenceAddAndGet(void* ctx, void* target, long value); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicSequenceGetBatchSize")] + public static extern int AtomicSequenceGetBatchSize(void* ctx, void* target); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicSequenceSetBatchSize")] + public static extern void AtomicSequenceSetBatchSize(void* ctx, void* target, int size); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicSequenceIsClosed")] + [return: MarshalAs(UnmanagedType.U1)] + public static extern bool AtomicSequenceIsClosed(void* ctx, void* target); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicSequenceClose")] + public static extern void AtomicSequenceClose(void* ctx, void* target); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicReferenceIsClosed")] + [return: MarshalAs(UnmanagedType.U1)] + public static extern bool AtomicReferenceIsClosed(void* ctx, void* target); + + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteAtomicReferenceClose")] + public static extern void AtomicReferenceClose(void* ctx, void* target); + [DllImport(IgniteUtils.FileIgniteJniDll, EntryPoint = "IgniteListenableCancel")] [return: MarshalAs(UnmanagedType.U1)] public static extern bool ListenableCancel(void* ctx, void* target);
