IGNITE-3511: .NET: Fixed AffinityFunctionBase.cs placement, added missing Package-Info.cs files.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/51add504 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/51add504 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/51add504 Branch: refs/heads/ignite-1232-1 Commit: 51add5049c9603a143a1a8f9233d94f92c8394da Parents: ecc77b0 Author: Pavel Tupitsyn <ptupit...@apache.org> Authored: Tue Jul 19 17:11:46 2016 +0300 Committer: vozerov-gridgain <voze...@gridgain.com> Committed: Tue Jul 19 17:11:46 2016 +0300 ---------------------------------------------------------------------- .../GridAffinityFunctionContextImpl.java | 9 + .../affinity/PlatformAffinityFunction.java | 277 +++++++++++++++++++ .../PlatformAffinityFunctionTarget.java | 113 ++++++++ .../cache/affinity/PlatformAffinityUtils.java | 116 ++++++++ .../callback/PlatformCallbackGateway.java | 89 ++++++ .../callback/PlatformCallbackUtils.java | 49 ++++ .../PlatformDotNetConfigurationClosure.java | 95 +++++++ .../dotnet/PlatformDotNetAffinityFunction.java | 171 ++++++++++++ .../platforms/cpp/jni/include/ignite/jni/java.h | 20 +- modules/platforms/cpp/jni/src/java.cpp | 41 ++- .../Apache.Ignite.Core.Tests.csproj | 7 + .../Affinity/AffinityFunctionSpringTest.cs | 184 ++++++++++++ .../Config/Cache/Affinity/affinity-function.xml | 129 +++++++++ .../Cache/Affinity/affinity-function2.xml | 49 ++++ .../Apache.Ignite.Core.Tests/TestRunner.cs | 3 +- .../Apache.Ignite.Core.csproj | 16 ++ .../Cache/Affinity/AffinityFunctionContext.cs | 120 ++++++++ .../Cache/Affinity/AffinityTopologyVersion.cs | 138 +++++++++ .../Cache/Affinity/Fair/FairAffinityFunction.cs | 33 +++ .../Cache/Affinity/Fair/Package-Info.cs | 26 ++ .../Cache/Affinity/IAffinityFunction.cs | 82 ++++++ .../Cache/Affinity/Package-Info.cs | 26 ++ .../Cache/Affinity/Rendezvous/Package-Info.cs | 26 ++ .../Rendezvous/RendezvousAffinityFunction.cs | 32 +++ .../Cache/Configuration/Package-Info.cs | 26 ++ .../Cache/Eviction/Package-Info.cs | 26 ++ .../Communication/Package-Info.cs | 26 ++ .../Communication/Tcp/Package-Info.cs | 26 ++ .../Apache.Ignite.Core/Events/EventReader.cs | 8 +- .../dotnet/Apache.Ignite.Core/Ignition.cs | 38 ++- .../Impl/Binary/BinaryReaderExtensions.cs | 16 +- .../Impl/Binary/Marshaller.cs | 1 + .../Impl/Cache/Affinity/AffinityFunctionBase.cs | 140 ++++++++++ .../Affinity/AffinityFunctionSerializer.cs | 277 +++++++++++++++++++ .../Cache/Affinity/PlatformAffinityFunction.cs | 74 +++++ .../Impl/Common/ObjectInfoHolder.cs | 86 ++++++ .../Apache.Ignite.Core/Impl/IgniteUtils.cs | 10 +- .../Impl/Unmanaged/UnmanagedCallbackHandlers.cs | 6 + .../Impl/Unmanaged/UnmanagedCallbacks.cs | 131 ++++++++- 39 files changed, 2715 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java index 6c97efd..4ddee00 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/affinity/GridAffinityFunctionContextImpl.java @@ -80,4 +80,13 @@ public class GridAffinityFunctionContextImpl implements AffinityFunctionContext @Override public int backups() { return backups; } + + /** + * Gets the previous assignment. + * + * @return Previous assignment. + */ + public List<List<ClusterNode>> prevAssignment() { + return prevAssignment; + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java new file mode 100644 index 0000000..6681e7a --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunction.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; +import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream; +import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.resources.IgniteInstanceResource; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.List; +import java.util.UUID; + +/** + * Platform AffinityFunction. + */ +public class PlatformAffinityFunction implements AffinityFunction, Externalizable, LifecycleAware { + /** */ + private static final long serialVersionUID = 0L; + + /** */ + private static final byte FLAG_PARTITION = 1; + + /** */ + private static final byte FLAG_REMOVE_NODE = 1 << 1; + + /** */ + private static final byte FLAG_ASSIGN_PARTITIONS = 1 << 2; + + /** */ + private Object userFunc; + + /** + * Partition count. + * + * 1) Java calls partitions() method very early (before LifecycleAware.start) during CacheConfiguration validation. + * 2) Partition count never changes. + * Therefore, we get the value on .NET side once, and pass it along with PlatformAffinity. + */ + private int partitions; + + /** */ + private AffinityFunction baseFunc; + + /** */ + private byte overrideFlags; + + /** */ + private transient Ignite ignite; + + /** */ + private transient PlatformContext ctx; + + /** */ + private transient long ptr; + + /** */ + private transient PlatformAffinityFunctionTarget baseTarget; + + + /** + * Ctor for serialization. + * + */ + public PlatformAffinityFunction() { + partitions = -1; + } + + /** + * Ctor. + * + * @param func User fun object. + * @param partitions Number of partitions. + */ + public PlatformAffinityFunction(Object func, int partitions, byte overrideFlags, AffinityFunction baseFunc) { + userFunc = func; + this.partitions = partitions; + this.overrideFlags = overrideFlags; + this.baseFunc = baseFunc; + } + + /** {@inheritDoc} */ + @Override public void reset() { + // userFunc is always in initial state (it is serialized only once on start). + if (baseFunc != null) + baseFunc.reset(); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + // Affinity function can not return different number of partitions, + // so we pass this value once from the platform. + assert partitions > 0; + + return partitions; + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + if ((overrideFlags & FLAG_PARTITION) == 0) { + assert baseFunc != null; + + return baseFunc.partition(key); + } + + assert ctx != null; + assert ptr != 0; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeObject(key); + + out.synchronize(); + + return ctx.gateway().affinityFunctionPartition(ptr, mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + if ((overrideFlags & FLAG_ASSIGN_PARTITIONS) == 0) { + assert baseFunc != null; + + return baseFunc.assignPartitions(affCtx); + } + + assert ctx != null; + assert ptr != 0; + assert affCtx != null; + + try (PlatformMemory outMem = ctx.memory().allocate()) { + try (PlatformMemory inMem = ctx.memory().allocate()) { + PlatformOutputStream out = outMem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + // Write previous assignment + PlatformAffinityUtils.writeAffinityFunctionContext(affCtx, writer, ctx); + + out.synchronize(); + + // Call platform + // We can not restore original AffinityFunctionContext after the call to platform, + // due to DiscoveryEvent (when node leaves, we can't get it by id anymore). + // Secondly, AffinityFunctionContext can't be changed by the user. + if (baseTarget != null) + baseTarget.setCurrentAffinityFunctionContext(affCtx); + + try { + ctx.gateway().affinityFunctionAssignPartitions(ptr, outMem.pointer(), inMem.pointer()); + } + finally { + if (baseTarget != null) + baseTarget.setCurrentAffinityFunctionContext(null); + } + + // Read result + return PlatformAffinityUtils.readPartitionAssignment(ctx.reader(inMem), ctx); + } + } + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + if ((overrideFlags & FLAG_REMOVE_NODE) == 0) { + assert baseFunc != null; + + baseFunc.removeNode(nodeId); + + return; + } + + assert ctx != null; + assert ptr != 0; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeUuid(nodeId); + + out.synchronize(); + + ctx.gateway().affinityFunctionRemoveNode(ptr, mem.pointer()); + } + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(userFunc); + out.writeInt(partitions); + out.writeByte(overrideFlags); + out.writeObject(baseFunc); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + userFunc = in.readObject(); + partitions = in.readInt(); + overrideFlags = in.readByte(); + baseFunc = (AffinityFunction)in.readObject(); + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + // userFunc is null when there is nothing overridden + if (userFunc == null) + return; + + assert ignite != null; + ctx = PlatformUtils.platformContext(ignite); + assert ctx != null; + + try (PlatformMemory mem = ctx.memory().allocate()) { + PlatformOutputStream out = mem.output(); + BinaryRawWriterEx writer = ctx.writer(out); + + writer.writeObject(userFunc); + + out.synchronize(); + + baseTarget = baseFunc != null + ? new PlatformAffinityFunctionTarget(ctx, baseFunc) + : null; + + ptr = ctx.gateway().affinityFunctionInit(mem.pointer(), baseTarget); + } + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + if (ptr == 0) + return; + + assert ctx != null; + + ctx.gateway().affinityFunctionDestroy(ptr); + } + + /** + * Injects the Ignite. + * + * @param ignite Ignite. + */ + @SuppressWarnings("unused") + @IgniteInstanceResource + public void setIgnite(Ignite ignite) { + this.ignite = ignite; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java new file mode 100644 index 0000000..8a07b33 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.IgniteCheckedException; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.processors.platform.PlatformAbstractTarget; +import org.apache.ignite.internal.processors.platform.PlatformContext; +import org.apache.ignite.internal.util.typedef.internal.U; + +import java.util.List; + +/** + * Platform affinity function target: + * to be invoked when Platform function calls base implementation of one of the AffinityFunction methods. + */ +public class PlatformAffinityFunctionTarget extends PlatformAbstractTarget { + /** */ + private static final int OP_PARTITION = 1; + + /** */ + private static final int OP_REMOVE_NODE = 2; + + /** */ + private static final int OP_ASSIGN_PARTITIONS = 3; + + /** Inner function to delegate calls to. */ + private final AffinityFunction baseFunc; + + /** Thread local to hold the current affinity function context. */ + private static final ThreadLocal<AffinityFunctionContext> currentAffCtx = new ThreadLocal<>(); + + /** + * Constructor. + * + * @param platformCtx Context. + * @param baseFunc Function to wrap. + */ + protected PlatformAffinityFunctionTarget(PlatformContext platformCtx, AffinityFunction baseFunc) { + super(platformCtx); + + assert baseFunc != null; + this.baseFunc = baseFunc; + + try { + platformCtx.kernalContext().resource().injectGeneric(baseFunc); + } + catch (IgniteCheckedException e) { + throw U.convertException(e); + } + } + + /** {@inheritDoc} */ + @Override protected long processInStreamOutLong(int type, BinaryRawReaderEx reader) throws IgniteCheckedException { + if (type == OP_PARTITION) + return baseFunc.partition(reader.readObjectDetached()); + else if (type == OP_REMOVE_NODE) { + baseFunc.removeNode(reader.readUuid()); + + return 0; + } + + return super.processInStreamOutLong(type, reader); + } + + /** {@inheritDoc} */ + @Override protected void processOutStream(int type, BinaryRawWriterEx writer) throws IgniteCheckedException { + if (type == OP_ASSIGN_PARTITIONS) { + AffinityFunctionContext affCtx = currentAffCtx.get(); + + if (affCtx == null) + throw new IgniteException("Thread-local AffinityFunctionContext is null. " + + "This may indicate an unsupported call to the base AffinityFunction."); + + final List<List<ClusterNode>> partitions = baseFunc.assignPartitions(affCtx); + + PlatformAffinityUtils.writePartitionAssignment(partitions, writer, platformContext()); + + return; + } + + super.processOutStream(type, writer); + } + + /** + * Sets the context for current operation. + * + * @param ctx Context. + */ + void setCurrentAffinityFunctionContext(AffinityFunctionContext ctx) { + currentAffCtx.set(ctx); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java new file mode 100644 index 0000000..b1e1b23 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityUtils.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.platform.cache.affinity; + +import org.apache.ignite.binary.BinaryRawReader; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.binary.BinaryRawWriterEx; +import org.apache.ignite.internal.cluster.IgniteClusterEx; +import org.apache.ignite.internal.processors.affinity.GridAffinityFunctionContextImpl; +import org.apache.ignite.internal.processors.platform.PlatformContext; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +/** + * Affinity serialization functions. + */ +public class PlatformAffinityUtils { + /** + * Writes the affinity function context. + * @param affCtx Affinity context. + * @param writer Writer. + * @param ctx Platform context. + */ + public static void writeAffinityFunctionContext(AffinityFunctionContext affCtx, BinaryRawWriterEx writer, + PlatformContext ctx) { + assert affCtx != null; + assert writer != null; + assert ctx != null; + + ctx.writeNodes(writer, affCtx.currentTopologySnapshot()); + + writer.writeInt(affCtx.backups()); + writer.writeLong(affCtx.currentTopologyVersion().topologyVersion()); + writer.writeInt(affCtx.currentTopologyVersion().minorTopologyVersion()); + + ctx.writeEvent(writer, affCtx.discoveryEvent()); + + // Write previous assignment + List<List<ClusterNode>> prevAssignment = ((GridAffinityFunctionContextImpl)affCtx).prevAssignment(); + + if (prevAssignment == null) + writer.writeInt(-1); + else { + writer.writeInt(prevAssignment.size()); + + for (List<ClusterNode> part : prevAssignment) + ctx.writeNodes(writer, part); + } + } + + /** + * Writes the partition assignment to a stream. + * + * @param partitions Partitions. + * @param writer Writer. + */ + public static void writePartitionAssignment(Collection<List<ClusterNode>> partitions, BinaryRawWriterEx writer, + PlatformContext ctx) { + assert partitions != null; + assert writer != null; + + writer.writeInt(partitions.size()); + + for (List<ClusterNode> part : partitions) + ctx.writeNodes(writer, part); + } + + /** + * Reads the partition assignment. + * + * @param reader Reader. + * @param ctx Platform context. + * @return Partitions. + */ + public static List<List<ClusterNode>> readPartitionAssignment(BinaryRawReader reader, PlatformContext ctx) { + assert reader != null; + assert ctx != null; + + int partCnt = reader.readInt(); + + List<List<ClusterNode>> res = new ArrayList<>(partCnt); + + IgniteClusterEx cluster = ctx.kernalContext().grid().cluster(); + + for (int i = 0; i < partCnt; i++) { + int partSize = reader.readInt(); + + List<ClusterNode> part = new ArrayList<>(partSize); + + for (int j = 0; j < partSize; j++) + part.add(cluster.node(reader.readUuid())); + + res.add(part); + } + + return res; + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java index 5093773..ac1416c 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackGateway.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.platform.callback; import org.apache.ignite.IgniteCheckedException; import org.apache.ignite.IgniteException; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget; import org.apache.ignite.internal.util.GridStripedSpinBusyLock; /** @@ -950,6 +951,94 @@ public class PlatformCallbackGateway { } /** + * Initializes affinity function. + * + * @param memPtr Pointer to a stream with serialized affinity function. + * @param baseFunc Optional func for base calls. + * @return Affinity function pointer. + */ + public long affinityFunctionInit(long memPtr, PlatformAffinityFunctionTarget baseFunc) { + enter(); + + try { + return PlatformCallbackUtils.affinityFunctionInit(envPtr, memPtr, baseFunc); + } + finally { + leave(); + } + } + + /** + * Gets the partition from affinity function. + * + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with key object. + * @return Partition number for a given key. + */ + public int affinityFunctionPartition(long ptr, long memPtr) { + enter(); + + try { + return PlatformCallbackUtils.affinityFunctionPartition(envPtr, ptr, memPtr); + } + finally { + leave(); + } + } + + /** + * Assigns the affinity partitions. + * + * @param ptr Affinity function pointer. + * @param outMemPtr Pointer to a stream with affinity context. + * @param inMemPtr Pointer to a stream with result. + */ + public void affinityFunctionAssignPartitions(long ptr, long outMemPtr, long inMemPtr){ + enter(); + + try { + PlatformCallbackUtils.affinityFunctionAssignPartitions(envPtr, ptr, outMemPtr, inMemPtr); + } + finally { + leave(); + } + } + + /** + * Removes the node from affinity function. + * + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with node id. + */ + public void affinityFunctionRemoveNode(long ptr, long memPtr) { + enter(); + + try { + PlatformCallbackUtils.affinityFunctionRemoveNode(envPtr, ptr, memPtr); + } + finally { + leave(); + } + } + + /** + * Destroys the affinity function. + * + * @param ptr Affinity function pointer. + */ + public void affinityFunctionDestroy(long ptr) { + if (!lock.enterBusy()) + return; // skip: destroy is not necessary during shutdown. + + try { + PlatformCallbackUtils.affinityFunctionDestroy(envPtr, ptr); + } + finally { + leave(); + } + } + + /** * Enter gateway. */ protected void enter() { http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java index f7d6586..7b36e5e 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/callback/PlatformCallbackUtils.java @@ -17,6 +17,8 @@ package org.apache.ignite.internal.processors.platform.callback; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunctionTarget; + /** * Platform callback utility methods. Implemented in target platform. All methods in this class must be * package-visible and invoked only through {@link PlatformCallbackGateway}. @@ -496,6 +498,53 @@ public class PlatformCallbackUtils { static native void onClientReconnected(long envPtr, boolean clusterRestarted); /** + * Initializes affinity function. + * + * @param envPtr Environment pointer. + * @param memPtr Pointer to a stream with serialized affinity function. + * @param baseFunc Optional func for base calls. + * @return Affinity function pointer. + */ + static native long affinityFunctionInit(long envPtr, long memPtr, PlatformAffinityFunctionTarget baseFunc); + + /** + * Gets the partition from affinity function. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with key object. + * @return Partition number for a given key. + */ + static native int affinityFunctionPartition(long envPtr, long ptr, long memPtr); + + /** + * Assigns the affinity partitions. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + * @param outMemPtr Pointer to a stream with affinity context. + * @param inMemPtr Pointer to a stream with result. + */ + static native void affinityFunctionAssignPartitions(long envPtr, long ptr, long outMemPtr, long inMemPtr); + + /** + * Removes the node from affinity function. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + * @param memPtr Pointer to a stream with node id. + */ + static native void affinityFunctionRemoveNode(long envPtr, long ptr, long memPtr); + + /** + * Destroys the affinity function. + * + * @param envPtr Environment pointer. + * @param ptr Affinity function pointer. + */ + static native void affinityFunctionDestroy(long envPtr, long ptr); + + /** * Private constructor. */ private PlatformCallbackUtils() { http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java index db2fa4d..5a7d15d 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/dotnet/PlatformDotNetConfigurationClosure.java @@ -23,14 +23,20 @@ import org.apache.ignite.binary.BinaryBasicIdMapper; import org.apache.ignite.binary.BinaryBasicNameMapper; import org.apache.ignite.binary.BinaryIdMapper; import org.apache.ignite.binary.BinaryNameMapper; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.fair.FairAffinityFunction; +import org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction; import org.apache.ignite.configuration.BinaryConfiguration; +import org.apache.ignite.configuration.CacheConfiguration; import org.apache.ignite.configuration.IgniteConfiguration; import org.apache.ignite.configuration.PlatformConfiguration; import org.apache.ignite.internal.binary.BinaryMarshaller; +import org.apache.ignite.internal.binary.BinaryRawReaderEx; import org.apache.ignite.internal.binary.BinaryRawWriterEx; import org.apache.ignite.internal.binary.BinaryReaderExImpl; import org.apache.ignite.internal.binary.GridBinaryMarshaller; import org.apache.ignite.internal.processors.platform.PlatformAbstractConfigurationClosure; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction; import org.apache.ignite.internal.processors.platform.lifecycle.PlatformLifecycleBean; import org.apache.ignite.internal.processors.platform.memory.PlatformMemory; import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryManagerImpl; @@ -40,6 +46,7 @@ import org.apache.ignite.internal.processors.platform.utils.PlatformUtils; import org.apache.ignite.internal.util.typedef.internal.U; import org.apache.ignite.lifecycle.LifecycleBean; import org.apache.ignite.marshaller.Marshaller; +import org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction; import org.apache.ignite.platform.dotnet.PlatformDotNetConfiguration; import org.apache.ignite.platform.dotnet.PlatformDotNetLifecycleBean; @@ -189,6 +196,16 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur writer.writeMap(bean.getProperties()); } + // Write .NET affinity funcs + List<PlatformDotNetAffinityFunction> affFuncs = affinityFunctions(igniteCfg); + + writer.writeInt(affFuncs.size()); + + for (PlatformDotNetAffinityFunction func : affFuncs) { + writer.writeString(func.getTypeName()); + writer.writeMap(func.getProperties()); + } + out.synchronize(); gate.extensionCallbackInLongLongOutLong( @@ -240,6 +257,63 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur cfg.setLifecycleBeans(mergedBeans); } } + + // Process affinity functions + List<PlatformDotNetAffinityFunction> affFuncs = affinityFunctions(cfg); + + if (!affFuncs.isEmpty()) { + for (PlatformDotNetAffinityFunction aff : affFuncs) + aff.init(readAffinityFunction(in)); + } + } + + /** + * Reads the affinity function. + * + * @param in Stream. + * @return Affinity function. + */ + private static PlatformAffinityFunction readAffinityFunction(BinaryRawReaderEx in) { + byte plcTyp = in.readByte(); + + if (plcTyp == 0) + return null; + + int partitions = in.readInt(); + boolean exclNeighbours = in.readBoolean(); + byte overrideFlags = in.readByte(); + Object userFunc = in.readObjectDetached(); + + AffinityFunction baseFunc = null; + + switch (plcTyp) { + case 1: { + FairAffinityFunction f = new FairAffinityFunction(); + + f.setPartitions(partitions); + f.setExcludeNeighbors(exclNeighbours); + + baseFunc = f; + + break; + } + + case 2: { + RendezvousAffinityFunction f = new RendezvousAffinityFunction(); + + f.setPartitions(partitions); + f.setExcludeNeighbors(exclNeighbours); + + baseFunc = f; + + break; + } + + default: + assert plcTyp == 3 : "Unknown affinity function policy type: " + plcTyp; + } + + return new PlatformAffinityFunction(userFunc, partitions, overrideFlags, baseFunc); } /** @@ -260,4 +334,25 @@ public class PlatformDotNetConfigurationClosure extends PlatformAbstractConfigur return res; } + + /** + * Find .NET affinity functions in configuration. + * + * @param cfg Configuration. + * @return affinity functions. + */ + private static List<PlatformDotNetAffinityFunction> affinityFunctions(IgniteConfiguration cfg) { + List<PlatformDotNetAffinityFunction> res = new ArrayList<>(); + + CacheConfiguration[] cacheCfg = cfg.getCacheConfiguration(); + + if (cacheCfg != null) { + for (CacheConfiguration ccfg : cacheCfg) { + if (ccfg.getAffinity() instanceof PlatformDotNetAffinityFunction) + res.add((PlatformDotNetAffinityFunction)ccfg.getAffinity()); + } + } + + return res; + } } http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java b/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java new file mode 100644 index 0000000..254c379 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/platform/dotnet/PlatformDotNetAffinityFunction.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.platform.dotnet; + +import org.apache.ignite.Ignite; +import org.apache.ignite.IgniteException; +import org.apache.ignite.cache.affinity.AffinityFunction; +import org.apache.ignite.cache.affinity.AffinityFunctionContext; +import org.apache.ignite.cluster.ClusterNode; +import org.apache.ignite.internal.processors.platform.cache.affinity.PlatformAffinityFunction; +import org.apache.ignite.lifecycle.LifecycleAware; +import org.apache.ignite.resources.IgniteInstanceResource; + +import java.io.Externalizable; +import java.io.IOException; +import java.io.ObjectInput; +import java.io.ObjectOutput; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +/** + * AffinityFunction implementation which can be used to configure .NET affinity function in Java Spring configuration. + */ +public class PlatformDotNetAffinityFunction implements AffinityFunction, Externalizable, LifecycleAware { + /** */ + private static final long serialVersionUID = 0L; + + /** .NET type name. */ + private transient String typName; + + /** Properties. */ + private transient Map<String, ?> props; + + /** Inner function. */ + private PlatformAffinityFunction func; + + /** + * Gets .NET type name. + * + * @return .NET type name. + */ + public String getTypeName() { + return typName; + } + + /** + * Sets .NET type name. + * + * @param typName .NET type name. + */ + public void setTypeName(String typName) { + this.typName = typName; + } + + /** + * Get properties. + * + * @return Properties. + */ + public Map<String, ?> getProperties() { + return props; + } + + /** + * Set properties. + * + * @param props Properties. + */ + public void setProperties(Map<String, ?> props) { + this.props = props; + } + + /** {@inheritDoc} */ + @Override public void reset() { + assert func != null; + + func.reset(); + } + + /** {@inheritDoc} */ + @Override public int partitions() { + assert func != null; + + return func.partitions(); + } + + /** {@inheritDoc} */ + @Override public int partition(Object key) { + assert func != null; + + return func.partition(key); + } + + /** {@inheritDoc} */ + @Override public List<List<ClusterNode>> assignPartitions(AffinityFunctionContext affCtx) { + assert func != null; + + return func.assignPartitions(affCtx); + } + + /** {@inheritDoc} */ + @Override public void removeNode(UUID nodeId) { + assert func != null; + + func.removeNode(nodeId); + } + + /** {@inheritDoc} */ + @Override public void writeExternal(ObjectOutput out) throws IOException { + out.writeObject(func); + } + + /** {@inheritDoc} */ + @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException { + func = (PlatformAffinityFunction) in.readObject(); + } + + /** + * Initializes this instance. + * + * @param func Underlying func. + */ + public void init(PlatformAffinityFunction func) { + assert func != null; + + this.func = func; + } + + /** {@inheritDoc} */ + @Override public void start() throws IgniteException { + assert func != null; + + func.start(); + } + + /** {@inheritDoc} */ + @Override public void stop() throws IgniteException { + assert func != null; + + func.stop(); + } + + /** + * Injects the Ignite. + * + * @param ignite Ignite. + */ + @SuppressWarnings("unused") + @IgniteInstanceResource + private void setIgnite(Ignite ignite) { + assert func != null; + + func.setIgnite(ignite); + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/cpp/jni/include/ignite/jni/java.h ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/include/ignite/jni/java.h b/modules/platforms/cpp/jni/include/ignite/jni/java.h index 8be8a32..18a15e2 100644 --- a/modules/platforms/cpp/jni/include/ignite/jni/java.h +++ b/modules/platforms/cpp/jni/include/ignite/jni/java.h @@ -101,6 +101,12 @@ namespace ignite typedef void(JNICALL *OnClientDisconnectedHandler)(void* target); typedef void(JNICALL *OnClientReconnectedHandler)(void* target, unsigned char clusterRestarted); + typedef long long(JNICALL *AffinityFunctionInitHandler)(void* target, long long memPtr, void* baseFunc); + typedef int(JNICALL *AffinityFunctionPartitionHandler)(void* target, long long ptr, long long memPtr); + typedef void(JNICALL *AffinityFunctionAssignPartitionsHandler)(void* target, long long ptr, long long inMemPtr, long long outMemPtr); + typedef void(JNICALL *AffinityFunctionRemoveNodeHandler)(void* target, long long ptr, long long memPtr); + typedef void(JNICALL *AffinityFunctionDestroyHandler)(void* target, long long ptr); + /** * JNI handlers holder. */ @@ -178,6 +184,12 @@ namespace ignite OnClientDisconnectedHandler onClientDisconnected; OnClientReconnectedHandler onClientReconnected; + + AffinityFunctionInitHandler affinityFunctionInit; + AffinityFunctionPartitionHandler affinityFunctionPartition; + AffinityFunctionAssignPartitionsHandler affinityFunctionAssignPartitions; + AffinityFunctionRemoveNodeHandler affinityFunctionRemoveNode; + AffinityFunctionDestroyHandler affinityFunctionDestroy; }; /** @@ -534,7 +546,7 @@ namespace ignite jobject ProcessorAtomicReference(jobject obj, char* name, long long memPtr, bool create); void ProcessorGetIgniteConfiguration(jobject obj, long long memPtr); void ProcessorGetCacheNames(jobject obj, long long memPtr); - + long long TargetInStreamOutLong(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); void TargetInStreamOutStream(jobject obj, int opType, long long inMemPtr, long long outMemPtr, JniErrorInfo* errInfo = NULL); jobject TargetInStreamOutObject(jobject obj, int type, long long memPtr, JniErrorInfo* errInfo = NULL); @@ -738,6 +750,12 @@ namespace ignite JNIEXPORT void JNICALL JniOnClientDisconnected(JNIEnv *env, jclass cls, jlong envPtr); JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted); + + JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr, jobject baseFunc); + JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr); + JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr); + JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr); + JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr); } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/cpp/jni/src/java.cpp ---------------------------------------------------------------------- diff --git a/modules/platforms/cpp/jni/src/java.cpp b/modules/platforms/cpp/jni/src/java.cpp index 282b874..56e042c 100644 --- a/modules/platforms/cpp/jni/src/java.cpp +++ b/modules/platforms/cpp/jni/src/java.cpp @@ -209,7 +209,7 @@ namespace ignite JniMethod M_PLATFORM_PROCESSOR_EXTENSIONS = JniMethod("extensions", "()Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); JniMethod M_PLATFORM_PROCESSOR_ATOMIC_LONG = JniMethod("atomicLong", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); JniMethod M_PLATFORM_PROCESSOR_ATOMIC_SEQUENCE = JniMethod("atomicSequence", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); - JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); + JniMethod M_PLATFORM_PROCESSOR_ATOMIC_REFERENCE = JniMethod("atomicReference", "(Ljava/lang/String;JZ)Lorg/apache/ignite/internal/processors/platform/PlatformTarget;", false); JniMethod M_PLATFORM_PROCESSOR_GET_IGNITE_CONFIGURATION = JniMethod("getIgniteConfiguration", "(J)V", false); JniMethod M_PLATFORM_PROCESSOR_GET_CACHE_NAMES = JniMethod("getCacheNames", "(J)V", false); @@ -361,6 +361,12 @@ namespace ignite JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED = JniMethod("onClientDisconnected", "(J)V", true); JniMethod M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED = JniMethod("onClientReconnected", "(JZ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT = JniMethod("affinityFunctionInit", "(JJLorg/apache/ignite/internal/processors/platform/cache/affinity/PlatformAffinityFunctionTarget;)J", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION = JniMethod("affinityFunctionPartition", "(JJJ)I", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS = JniMethod("affinityFunctionAssignPartitions", "(JJJJ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE = JniMethod("affinityFunctionRemoveNode", "(JJJ)V", true); + JniMethod M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY = JniMethod("affinityFunctionDestroy", "(JJ)V", true); + const char* C_PLATFORM_UTILS = "org/apache/ignite/internal/processors/platform/utils/PlatformUtils"; JniMethod M_PLATFORM_UTILS_REALLOC = JniMethod("reallocate", "(JI)V", true); JniMethod M_PLATFORM_UTILS_ERR_DATA = JniMethod("errorData", "(Ljava/lang/Throwable;)[B", true); @@ -819,7 +825,7 @@ namespace ignite void RegisterNatives(JNIEnv* env) { { - JNINativeMethod methods[54]; + JNINativeMethod methods[59]; int idx = 0; @@ -896,6 +902,12 @@ namespace ignite AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_DISCONNECTED, reinterpret_cast<void*>(JniOnClientDisconnected)); AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_ON_CLIENT_RECONNECTED, reinterpret_cast<void*>(JniOnClientReconnected)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_INIT, reinterpret_cast<void*>(JniAffinityFunctionInit)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_PARTITION, reinterpret_cast<void*>(JniAffinityFunctionPartition)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_ASSIGN_PARTITIONS, reinterpret_cast<void*>(JniAffinityFunctionAssignPartitions)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_REMOVE_NODE, reinterpret_cast<void*>(JniAffinityFunctionRemoveNode)); + AddNativeMethod(methods + idx++, M_PLATFORM_CALLBACK_UTILS_AFFINITY_FUNCTION_DESTROY, reinterpret_cast<void*>(JniAffinityFunctionDestroy)); + jint res = env->RegisterNatives(FindClass(env, C_PLATFORM_CALLBACK_UTILS), methods, idx); if (res != JNI_OK) @@ -968,7 +980,7 @@ namespace ignite int errMsgLen = 0; try { - if (!JVM.GetJvm()) + if (!JVM.GetJvm()) { // 1. Create JVM itself. jint res = CreateJvm(opts, optsLen, &jvm, &env); @@ -2813,7 +2825,7 @@ namespace ignite JNIEXPORT jlong JNICALL JniExtensionCallbackInLongLongOutLong(JNIEnv *env, jclass cls, jlong envPtr, jint typ, jlong arg1, jlong arg2) { IGNITE_SAFE_FUNC(env, envPtr, ExtensionCallbackInLongLongOutLongHandler, extensionCallbackInLongLongOutLong, typ, arg1, arg2); } - + JNIEXPORT void JNICALL JniOnClientDisconnected(JNIEnv *env, jclass cls, jlong envPtr) { IGNITE_SAFE_PROC_NO_ARG(env, envPtr, OnClientDisconnectedHandler, onClientDisconnected); } @@ -2821,6 +2833,27 @@ namespace ignite JNIEXPORT void JNICALL JniOnClientReconnected(JNIEnv *env, jclass cls, jlong envPtr, jboolean clusterRestarted) { IGNITE_SAFE_PROC(env, envPtr, OnClientReconnectedHandler, onClientReconnected, clusterRestarted); } + + JNIEXPORT jlong JNICALL JniAffinityFunctionInit(JNIEnv *env, jclass cls, jlong envPtr, jlong memPtr, jobject baseFunc) { + void* baseFuncRef = baseFunc ? env->NewGlobalRef(baseFunc) : nullptr; + IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionInitHandler, affinityFunctionInit, memPtr, baseFuncRef); + } + + JNIEXPORT jint JNICALL JniAffinityFunctionPartition(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) { + IGNITE_SAFE_FUNC(env, envPtr, AffinityFunctionPartitionHandler, affinityFunctionPartition, ptr, memPtr); + } + + JNIEXPORT void JNICALL JniAffinityFunctionAssignPartitions(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong inMemPtr, jlong outMemPtr) { + IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionAssignPartitionsHandler, affinityFunctionAssignPartitions, ptr, inMemPtr, outMemPtr); + } + + JNIEXPORT void JNICALL JniAffinityFunctionRemoveNode(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr, jlong memPtr) { + IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionRemoveNodeHandler, affinityFunctionRemoveNode, ptr, memPtr); + } + + JNIEXPORT void JNICALL JniAffinityFunctionDestroy(JNIEnv *env, jclass cls, jlong envPtr, jlong ptr) { + IGNITE_SAFE_PROC(env, envPtr, AffinityFunctionDestroyHandler, affinityFunctionDestroy, ptr); + } } } } http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj index b937d28..f5eef65 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Apache.Ignite.Core.Tests.csproj @@ -58,6 +58,7 @@ <Compile Include="Binary\BinarySelfTestFullFooter.cs" /> <Compile Include="Cache\CacheAffinityFieldTest.cs" /> <Compile Include="Cache\CacheConfigurationTest.cs" /> + <Compile Include="Cache\Affinity\AffinityFunctionSpringTest.cs" /> <Compile Include="Cache\CacheDynamicStartTest.cs" /> <Compile Include="Cache\CacheNearTest.cs" /> <Compile Include="Cache\CacheTestAsyncWrapper.cs" /> @@ -200,6 +201,12 @@ <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> <SubType>Designer</SubType> </Content> + <Content Include="Config\Cache\Affinity\affinity-function2.xml"> + <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> + </Content> + <Content Include="Config\Cache\Affinity\affinity-function.xml"> + <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> + </Content> <Content Include="Config\Cache\Store\cache-store-session.xml"> <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory> </Content> http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs new file mode 100644 index 0000000..7b317ac --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Cache/Affinity/AffinityFunctionSpringTest.cs @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// ReSharper disable UnusedAutoPropertyAccessor.Local +// ReSharper disable UnusedMember.Local +namespace Apache.Ignite.Core.Tests.Cache.Affinity +{ + using System; + using System.Collections.Generic; + using System.Linq; + using Apache.Ignite.Core.Cache; + using Apache.Ignite.Core.Cache.Affinity; + using Apache.Ignite.Core.Cache.Affinity.Fair; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Resource; + using NUnit.Framework; + + /// <summary> + /// Tests AffinityFunction defined in Spring XML. + /// </summary> + public class AffinityFunctionSpringTest : IgniteTestBase + { + /// <summary> + /// Initializes a new instance of the <see cref="AffinityFunctionSpringTest"/> class. + /// </summary> + public AffinityFunctionSpringTest() : base(6, + "config\\cache\\affinity\\affinity-function.xml", + "config\\cache\\affinity\\affinity-function2.xml") + { + // No-op. + } + + /// <summary> + /// Tests the static cache. + /// </summary> + [Test] + public void TestStaticCache() + { + ValidateAffinityFunction(Grid.GetCache<long, int>("cache1")); + ValidateAffinityFunction(Grid2.GetCache<long, int>("cache1")); + ValidateAffinityFunction(Grid.GetCache<long, int>("cache2")); + ValidateAffinityFunction(Grid2.GetCache<long, int>("cache2")); + } + + /// <summary> + /// Tests the dynamic cache. + /// </summary> + [Test] + public void TestDynamicCache() + { + ValidateAffinityFunction(Grid.CreateCache<long, int>("dyn-cache-1")); + ValidateAffinityFunction(Grid2.GetCache<long, int>("dyn-cache-1")); + + ValidateAffinityFunction(Grid2.CreateCache<long, int>("dyn-cache-2")); + ValidateAffinityFunction(Grid.GetCache<long, int>("dyn-cache-2")); + + ValidateAffinityFunction(Grid.CreateCache<long, int>("dyn-cache2-1")); + ValidateAffinityFunction(Grid2.GetCache<long, int>("dyn-cache2-1")); + + ValidateAffinityFunction(Grid2.CreateCache<long, int>("dyn-cache2-2")); + ValidateAffinityFunction(Grid.GetCache<long, int>("dyn-cache2-2")); + } + + /// <summary> + /// Validates the affinity function. + /// </summary> + /// <param name="cache">The cache.</param> + private static void ValidateAffinityFunction(ICache<long, int> cache) + { + var aff = cache.Ignite.GetAffinity(cache.Name); + + Assert.AreEqual(5, aff.Partitions); + + // Predefined map + Assert.AreEqual(2, aff.GetPartition(1L)); + Assert.AreEqual(1, aff.GetPartition(2L)); + + // Other keys + Assert.AreEqual(1, aff.GetPartition(13L)); + Assert.AreEqual(3, aff.GetPartition(4L)); + } + + private class TestFunc : IAffinityFunction // [Serializable] is not necessary + { + [InstanceResource] + private readonly IIgnite _ignite = null; + + private int Property1 { get; set; } + + private string Property2 { get; set; } + + public int Partitions + { + get { return 5; } + } + + public int GetPartition(object key) + { + Assert.IsNotNull(_ignite); + Assert.AreEqual(1, Property1); + Assert.AreEqual("1", Property2); + + var longKey = (long)key; + int res; + + if (TestFairFunc.PredefinedParts.TryGetValue(longKey, out res)) + return res; + + return (int)(longKey * 2 % 5); + } + + // ReSharper disable once UnusedParameter.Local + public void RemoveNode(Guid nodeId) + { + // No-op. + } + + public IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context) + { + return Enumerable.Range(0, Partitions).Select(x => context.CurrentTopologySnapshot); + } + } + + private class TestFairFunc : FairAffinityFunction // [Serializable] is not necessary + { + public static readonly Dictionary<long, int> PredefinedParts = new Dictionary<long, int> + { + {1, 2}, + {2, 1} + }; + + [InstanceResource] + private readonly IIgnite _ignite = null; + + private int Property1 { get; set; } + + private string Property2 { get; set; } + + public override int GetPartition(object key) + { + Assert.IsNotNull(_ignite); + Assert.AreEqual(1, Property1); + Assert.AreEqual("1", Property2); + + Assert.IsInstanceOf<long>(key); + + var basePart = base.GetPartition(key); + Assert.Greater(basePart, -1); + Assert.Less(basePart, Partitions); + + var longKey = (long) key; + int res; + + if (PredefinedParts.TryGetValue(longKey, out res)) + return res; + + return (int) (longKey * 2 % 5); + } + + public override IEnumerable<IEnumerable<IClusterNode>> AssignPartitions(AffinityFunctionContext context) + { + var baseRes = base.AssignPartitions(context).ToList(); // test base call + + Assert.AreEqual(Partitions, baseRes.Count); + + return baseRes; + } + } + } +} http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml new file mode 100644 index 0000000..67ff128 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function.xml @@ -0,0 +1,129 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/util + http://www.springframework.org/schema/util/spring-util.xsd"> + <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <property name="localHost" value="127.0.0.1"/> + <property name="connectorConfiguration"> + <null/> + </property> + + <property name="cacheConfiguration"> + <list> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="name" value="cache1"/> + + <property name="affinity"> + <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction"> + <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityFunctionSpringTest+TestFunc, Apache.Ignite.Core.Tests"/> + <property name="properties"> + <map> + <entry key="Property1"> + <value type="java.lang.Integer">1</value> + </entry> + <entry key="Property2" value="1"/> + </map> + </property> + </bean> + </property> + </bean> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="name" value="dyn-cache-*"/> + + <property name="affinity"> + <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction"> + <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityFunctionSpringTest+TestFunc, Apache.Ignite.Core.Tests"/> + <property name="properties"> + <map> + <entry key="Property1"> + <value type="java.lang.Integer">1</value> + </entry> + <entry key="Property2" value="1"/> + </map> + </property> + </bean> + </property> + </bean> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="name" value="cache2"/> + + <property name="affinity"> + <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction"> + <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityFunctionSpringTest+TestFairFunc, Apache.Ignite.Core.Tests"/> + <property name="properties"> + <map> + <entry key="Property1"> + <value type="java.lang.Integer">1</value> + </entry> + <entry key="Property2" value="1"/> + <entry key="Partitions"> + <value type="java.lang.Integer">5</value> + </entry> + </map> + </property> + </bean> + </property> + </bean> + <bean class="org.apache.ignite.configuration.CacheConfiguration"> + <property name="name" value="dyn-cache2-*"/> + + <property name="affinity"> + <bean class="org.apache.ignite.platform.dotnet.PlatformDotNetAffinityFunction"> + <property name="typeName" value="Apache.Ignite.Core.Tests.Cache.Affinity.AffinityFunctionSpringTest+TestFairFunc, Apache.Ignite.Core.Tests"/> + <property name="properties"> + <map> + <entry key="Property1"> + <value type="java.lang.Integer">1</value> + </entry> + <entry key="Property2" value="1"/> + <entry key="Partitions"> + <value type="java.lang.Integer">5</value> + </entry> + </map> + </property> + </bean> + </property> + </bean> + </list> + </property> + + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <property name="addresses"> + <list> + <!-- In distributed environment, replace with actual host IP address. --> + <value>127.0.0.1:47500</value> + </list> + </property> + </bean> + </property> + <property name="socketTimeout" value="300" /> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml new file mode 100644 index 0000000..cab34b5 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/Config/Cache/Affinity/affinity-function2.xml @@ -0,0 +1,49 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xmlns:util="http://www.springframework.org/schema/util" + xsi:schemaLocation=" + http://www.springframework.org/schema/beans + http://www.springframework.org/schema/beans/spring-beans.xsd + http://www.springframework.org/schema/util + http://www.springframework.org/schema/util/spring-util.xsd"> + <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration"> + <property name="localHost" value="127.0.0.1"/> + <property name="connectorConfiguration"><null/></property> + <property name="gridName" value="grid2" /> + + <property name="discoverySpi"> + <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi"> + <property name="ipFinder"> + <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> + <property name="addresses"> + <list> + <!-- In distributed environment, replace with actual host IP address. --> + <value>127.0.0.1:47500</value> + </list> + </property> + </bean> + </property> + <property name="socketTimeout" value="300" /> + </bean> + </property> + </bean> +</beans> http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs index 74ea846..2537445 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs +++ b/modules/platforms/dotnet/Apache.Ignite.Core.Tests/TestRunner.cs @@ -20,6 +20,7 @@ namespace Apache.Ignite.Core.Tests using System; using System.Diagnostics; using System.Reflection; + using Apache.Ignite.Core.Tests.Cache.Affinity; using Apache.Ignite.Core.Tests.Cache.Query; using Apache.Ignite.Core.Tests.Memory; using NUnit.ConsoleRunner; @@ -34,7 +35,7 @@ namespace Apache.Ignite.Core.Tests TestOne(typeof(CacheLinqTest), "TestExcept"); - //TestAll(typeof (CacheQueriesCodeConfigurationTest)); + TestAll(typeof (AffinityFunctionSpringTest)); //TestAllInAssembly(); } http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj index 649de2e..823f9f2 100644 --- a/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Apache.Ignite.Core.csproj @@ -87,13 +87,25 @@ <Compile Include="Binary\Package-Info.cs" /> <Compile Include="Cache\Affinity\AffinityKey.cs" /> <Compile Include="Cache\Affinity\AffinityKeyMappedAttribute.cs" /> + <Compile Include="Cache\Affinity\AffinityFunctionContext.cs" /> + <Compile Include="Cache\Affinity\AffinityTopologyVersion.cs" /> + <Compile Include="Cache\Affinity\Fair\FairAffinityFunction.cs" /> + <Compile Include="Cache\Affinity\Fair\Package-Info.cs" /> + <Compile Include="Cache\Affinity\IAffinityFunction.cs" /> + <Compile Include="Cache\Affinity\Package-Info.cs" /> + <Compile Include="Cache\Affinity\Rendezvous\Package-Info.cs" /> + <Compile Include="Cache\Affinity\Rendezvous\RendezvousAffinityFunction.cs" /> <Compile Include="Cache\CacheAtomicUpdateTimeoutException.cs" /> <Compile Include="Cache\CacheEntryProcessorException.cs" /> <Compile Include="Cache\CacheException.cs" /> <Compile Include="Cache\CachePartialUpdateException.cs" /> <Compile Include="Cache\CachePeekMode.cs" /> <Compile Include="Cache\Configuration\NearCacheConfiguration.cs" /> + <Compile Include="Cache\Configuration\Package-Info.cs" /> + <Compile Include="Cache\Eviction\Package-Info.cs" /> <Compile Include="Communication\ICommunicationSpi.cs" /> + <Compile Include="Communication\Package-Info.cs" /> + <Compile Include="Communication\Tcp\Package-Info.cs" /> <Compile Include="Communication\Tcp\TcpCommunicationSpi.cs" /> <Compile Include="DataStructures\Configuration\AtomicConfiguration.cs" /> <Compile Include="Cache\Configuration\QueryAlias.cs" /> @@ -153,8 +165,12 @@ <Compile Include="Common\IgniteGuid.cs" /> <Compile Include="Common\Package-Info.cs" /> <Compile Include="Impl\Binary\BinaryWriterExtensions.cs" /> + <Compile Include="Impl\Cache\Affinity\AffinityFunctionBase.cs" /> <Compile Include="Impl\Common\Platform.cs" /> + <Compile Include="Impl\Cache\Affinity\AffinityFunctionSerializer.cs" /> + <Compile Include="Impl\Cache\Affinity\PlatformAffinityFunction.cs" /> <Compile Include="Impl\Cache\Event\JavaCacheEntryEventFilter.cs" /> + <Compile Include="Impl\Common\ObjectInfoHolder.cs" /> <Compile Include="Impl\Common\PlatformJavaObjectFactoryProxy.cs" /> <Compile Include="Compute\ComputeExecutionRejectedException.cs" /> <Compile Include="Compute\ComputeJobAdapter.cs" /> http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs new file mode 100644 index 0000000..6067af4 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityFunctionContext.cs @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Cache.Affinity +{ + using System.Collections.Generic; + using System.Diagnostics; + using Apache.Ignite.Core.Binary; + using Apache.Ignite.Core.Cluster; + using Apache.Ignite.Core.Events; + using Apache.Ignite.Core.Impl; + + /// <summary> + /// Affinity function context. + /// </summary> + public class AffinityFunctionContext + { + /** */ + private readonly List<List<IClusterNode>> _previousAssignment; + + /** */ + private readonly int _backups; + + /** */ + private readonly ICollection<IClusterNode> _currentTopologySnapshot; + + /** */ + private readonly AffinityTopologyVersion _currentTopologyVersion; + + /** */ + private readonly DiscoveryEvent _discoveryEvent; + + /// <summary> + /// Initializes a new instance of the <see cref="AffinityFunctionContext"/> class. + /// </summary> + /// <param name="reader">The reader.</param> + internal AffinityFunctionContext(IBinaryRawReader reader) + { + Debug.Assert(reader != null); + + _currentTopologySnapshot = IgniteUtils.ReadNodes(reader); + _backups = reader.ReadInt(); + _currentTopologyVersion = new AffinityTopologyVersion(reader.ReadLong(), reader.ReadInt()); + _discoveryEvent = EventReader.Read<DiscoveryEvent>(reader); + + // Prev assignment + var cnt = reader.ReadInt(); + + if (cnt > 0) + { + _previousAssignment = new List<List<IClusterNode>>(cnt); + + for (var i = 0; i < cnt; i++) + _previousAssignment.Add(IgniteUtils.ReadNodes(reader)); + } + } + + /// <summary> + /// Gets the affinity assignment for given partition on previous topology version. + /// First node in returned list is a primary node, other nodes are backups. + /// </summary> + /// <param name="partition">The partition to get previous assignment for.</param> + /// <returns> + /// List of nodes assigned to a given partition on previous topology version or <code>null</code> + /// if this information is not available. + /// </returns> + public ICollection<IClusterNode> GetPreviousAssignment(int partition) + { + return _previousAssignment == null ? null : _previousAssignment[partition]; + } + + /// <summary> + /// Gets number of backups for new assignment. + /// </summary> + public int Backups + { + get { return _backups; } + } + + /// <summary> + /// Gets the current topology snapshot. Snapshot will contain only nodes on which the particular + /// cache is configured. List of passed nodes is guaranteed to be sorted in a same order + /// on all nodes on which partition assignment is performed. + /// </summary> + public ICollection<IClusterNode> CurrentTopologySnapshot + { + get { return _currentTopologySnapshot; } + } + + /// <summary> + /// Gets the current topology version. + /// </summary> + public AffinityTopologyVersion CurrentTopologyVersion + { + get { return _currentTopologyVersion; } + } + + /// <summary> + /// Gets the discovery event that caused the topology change. + /// </summary> + public DiscoveryEvent DiscoveryEvent + { + get { return _discoveryEvent; } + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs new file mode 100644 index 0000000..9bfdfb4 --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/AffinityTopologyVersion.cs @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Cache.Affinity +{ + using System; + using Apache.Ignite.Core.Cluster; + + /// <summary> + /// Affinity topology version. + /// </summary> + public struct AffinityTopologyVersion : IEquatable<AffinityTopologyVersion> + { + /** */ + private readonly long _version; + + /** */ + private readonly int _minorVersion; + + /// <summary> + /// Initializes a new instance of the <see cref="AffinityTopologyVersion"/> struct. + /// </summary> + /// <param name="version">The version.</param> + /// <param name="minorVersion">The minor version.</param> + public AffinityTopologyVersion(long version, int minorVersion) + { + _version = version; + _minorVersion = minorVersion; + } + + /// <summary> + /// Gets the major version, same as <see cref="ICluster.TopologyVersion"/>. + /// </summary> + public long Version + { + get { return _version; } + } + + /// <summary> + /// Gets the minor version, which is increased when new caches start. + /// </summary> + public int MinorVersion + { + get { return _minorVersion; } + } + + /// <summary> + /// Indicates whether the current object is equal to another object of the same type. + /// </summary> + /// <param name="other">An object to compare with this object.</param> + /// <returns> + /// true if the current object is equal to the <paramref name="other" /> parameter; otherwise, false. + /// </returns> + public bool Equals(AffinityTopologyVersion other) + { + return _version == other._version && _minorVersion == other._minorVersion; + } + + /// <summary> + /// Determines whether the specified <see cref="System.Object" />, is equal to this instance. + /// </summary> + /// <param name="obj">The <see cref="System.Object" /> to compare with this instance.</param> + /// <returns> + /// <c>true</c> if the specified <see cref="System.Object" /> is equal to this instance; otherwise, + /// <c>false</c>. + /// </returns> + public override bool Equals(object obj) + { + if (ReferenceEquals(null, obj)) return false; + return obj is AffinityTopologyVersion && Equals((AffinityTopologyVersion) obj); + } + + /// <summary> + /// Returns a hash code for this instance. + /// </summary> + /// <returns> + /// A hash code for this instance, suitable for use in hashing algorithms and data structures like a hash table. + /// </returns> + public override int GetHashCode() + { + unchecked + { + return (_version.GetHashCode()*397) ^ _minorVersion; + } + } + + /// <summary> + /// Implements the operator ==. + /// </summary> + /// <param name="left">The left.</param> + /// <param name="right">The right.</param> + /// <returns> + /// The result of the operator. + /// </returns> + public static bool operator ==(AffinityTopologyVersion left, AffinityTopologyVersion right) + { + return left.Equals(right); + } + + /// <summary> + /// Implements the operator !=. + /// </summary> + /// <param name="left">The left.</param> + /// <param name="right">The right.</param> + /// <returns> + /// The result of the operator. + /// </returns> + public static bool operator !=(AffinityTopologyVersion left, AffinityTopologyVersion right) + { + return !left.Equals(right); + } + + /// <summary> + /// Returns a <see cref="string" /> that represents this instance. + /// </summary> + /// <returns> + /// A <see cref="string" /> that represents this instance. + /// </returns> + public override string ToString() + { + return string.Format("AffinityTopologyVersion [Version={0}, MinorVersion={1}]", _version, _minorVersion); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs new file mode 100644 index 0000000..f06937d --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/FairAffinityFunction.cs @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Apache.Ignite.Core.Cache.Affinity.Fair +{ + using System; + using Apache.Ignite.Core.Impl.Cache.Affinity; + + /// <summary> + /// Fair affinity function which tries to ensure that all nodes get equal number of partitions with + /// minimum amount of reassignments between existing nodes. + /// </summary> + [Serializable] + public class FairAffinityFunction : AffinityFunctionBase + { + // No-op. + // Actual implementation is in Java, see AffinityFunctionSerializer.Write method. + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/ignite/blob/51add504/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/Package-Info.cs ---------------------------------------------------------------------- diff --git a/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/Package-Info.cs b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/Package-Info.cs new file mode 100644 index 0000000..29a21bd --- /dev/null +++ b/modules/platforms/dotnet/Apache.Ignite.Core/Cache/Affinity/Fair/Package-Info.cs @@ -0,0 +1,26 @@ +/* +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +#pragma warning disable 1587 // invalid XML comment + +/// <summary> +/// Fair affinity API. +/// </summary> +namespace Apache.Ignite.Core.Cache.Affinity.Fair +{ + // No-op. +}