Added node selection strategy.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5169cbf5 Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5169cbf5 Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5169cbf5 Branch: refs/heads/ignite-3553 Commit: 5169cbf578c144c07b82984ab5d63f3e14435e1a Parents: 4cebd51 Author: vozerov-gridgain <[email protected]> Authored: Wed Jul 27 16:25:55 2016 +0300 Committer: vozerov-gridgain <[email protected]> Committed: Wed Jul 27 16:25:55 2016 +0300 ---------------------------------------------------------------------- .../igfs/client/IgfsClientManager.java | 50 +++++++++++++++----- .../client/IgfsClientNodeSelectionStrategy.java | 44 +++++++++++++++++ 2 files changed, 82 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ignite/blob/5169cbf5/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java index db5b45d..05bbec1 100644 --- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientManager.java @@ -58,12 +58,6 @@ public class IgfsClientManager extends IgfsManager { /** Marshaller. */ private final Marshaller marsh; - /** Whether manager is fully started and ready to process requests. */ - private volatile boolean ready; - - /** Stopping flag. */ - private volatile boolean stopping; - /** RW lock for synchronization. */ private final StripedCompositeReadWriteLock rwLock = new StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors() * 2); @@ -75,11 +69,17 @@ public class IgfsClientManager extends IgfsManager { private final MessageListener msgLsnr = new MessageListener(); /** Pending input operations received when manager is not started yet. */ - private final ConcurrentLinkedDeque<IgfsClientInOperation> pending = new ConcurrentLinkedDeque<>(); + private final ConcurrentLinkedDeque<IgfsClientInOperation> pendingOps = new ConcurrentLinkedDeque<>(); /** Worker to process pending requests. */ private PendingRequestsWorker pendingWorker; + /** Whether manager is fully started and ready to process requests. */ + private volatile boolean ready; + + /** Stopping flag. */ + private volatile boolean stopping; + /** * Constructor. * @@ -105,7 +105,7 @@ public class IgfsClientManager extends IgfsManager { try { ready = true; - if (!pending.isEmpty()) { + if (!pendingOps.isEmpty()) { pendingWorker = new PendingRequestsWorker(ctx.gridName(), "igfs-client-pending-request-worker", log); new IgniteThread(pendingWorker).start(); @@ -143,7 +143,7 @@ public class IgfsClientManager extends IgfsManager { /** {@inheritDoc} */ @Override protected void stop0(boolean cancel) { - pending.clear(); + pendingOps.clear(); outOps.clear(); } @@ -155,7 +155,20 @@ public class IgfsClientManager extends IgfsManager { * @return Result. */ public <T> T execute(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> clo) throws IgniteCheckedException { - return executeAsync(igfsCtx, clo).get(); + return execute(igfsCtx, clo, IgfsClientNodeSelectionStrategy.RANDOM); + } + + /** + * Execute IGFS closure. + * + * @param igfsCtx IGFS context. + * @param clo Closure. + * @param strategy Node selection strategy. + * @return Result. + */ + public <T> T execute(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> clo, + IgfsClientNodeSelectionStrategy strategy) throws IgniteCheckedException { + return executeAsync(igfsCtx, clo, strategy).get(); } /** @@ -166,6 +179,19 @@ public class IgfsClientManager extends IgfsManager { * @return Future. */ public <T> IgniteInternalFuture<T> executeAsync(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> clo) { + return executeAsync(igfsCtx, clo, IgfsClientNodeSelectionStrategy.RANDOM); + } + + /** + * Execute IGFS closure asynchronously. + * + * @param igfsCtx IGFS context. + * @param clo Closure. + * @param strategy Node selection strategy. + * @return Future. + */ + public <T> IgniteInternalFuture<T> executeAsync(IgfsContext igfsCtx, IgfsClientAbstractCallable<T> clo, + IgfsClientNodeSelectionStrategy strategy) { // TODO @@ -228,7 +254,7 @@ public class IgfsClientManager extends IgfsManager { processRequest(nodeId, req); // Normal execution flow. else // Add to pending set if manager is not operational yet. - pending.addLast(new IgfsClientInOperation(nodeId, req)); + pendingOps.addLast(new IgfsClientInOperation(nodeId, req)); } finally { rwLock.readLock().unlock(); @@ -392,7 +418,7 @@ public class IgfsClientManager extends IgfsManager { @Override protected void body() throws InterruptedException, IgniteInterruptedCheckedException { IgfsClientInOperation inOp; - while ((inOp = pending.pollFirst()) != null) + while ((inOp = pendingOps.pollFirst()) != null) processRequest(inOp.nodeId(), inOp.request()); } } http://git-wip-us.apache.org/repos/asf/ignite/blob/5169cbf5/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientNodeSelectionStrategy.java ---------------------------------------------------------------------- diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientNodeSelectionStrategy.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientNodeSelectionStrategy.java new file mode 100644 index 0000000..e25f340 --- /dev/null +++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientNodeSelectionStrategy.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.processors.igfs.client; + +import org.jetbrains.annotations.Nullable; + +/** + * IGFS node selection strategy. + */ +public enum IgfsClientNodeSelectionStrategy { + /** Pick random node. */ + RANDOM, + + /** Pick a node where root ID resides. */ + ROOT_ID; + + /** Enum values. */ + private static final IgfsClientNodeSelectionStrategy[] VALS = values(); + + /** + * Efficiently gets enumerated value from its ordinal. + * + * @param ord Ordinal value. + * @return Enumerated value. + */ + @Nullable public static IgfsClientNodeSelectionStrategy fromOrdinal(int ord) { + return ord >= 0 && ord < VALS.length ? VALS[ord] : null; + } +}
