WIP on closure execution logic.

Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/92f7d9d0
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/92f7d9d0
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/92f7d9d0

Branch: refs/heads/ignite-3553
Commit: 92f7d9d086d93ed97cc74252e670e1de3152fd9f
Parents: d67d8e8
Author: vozerov-gridgain <[email protected]>
Authored: Wed Jul 27 11:03:39 2016 +0300
Committer: vozerov-gridgain <[email protected]>
Committed: Wed Jul 27 11:03:39 2016 +0300

----------------------------------------------------------------------
 .../client/IgfsClientClosureInOperation.java    | 64 --------------------
 .../igfs/client/IgfsClientClosureManager.java   | 43 ++++++++++++-
 2 files changed, 41 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/92f7d9d0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureInOperation.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureInOperation.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureInOperation.java
deleted file mode 100644
index 13a34ed..0000000
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureInOperation.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.igfs.client;
-
-import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.util.typedef.internal.S;
-
-import java.util.UUID;
-
-/**
- * IGFS client closure incoming opeartion descriptor.
- */
-public class IgfsClientClosureInOperation {
-    /** Target node ID. */
-    private final UUID nodeId;
-
-    /** Target operation. */
-    private final IgfsClientAbstractCallable target;
-
-    /**
-     * Constructor.
-     *
-     * @param nodeId Target node ID.
-     * @param target Target operation.
-     */
-    public IgfsClientClosureInOperation(UUID nodeId, 
IgfsClientAbstractCallable target) {
-        this.nodeId = nodeId;
-        this.target = target;
-    }
-
-    /**
-     * @return Target node ID.
-     */
-    public UUID nodeId() {
-        return nodeId;
-    }
-
-    /**
-     * @return Target operation.
-     */
-    public IgfsClientAbstractCallable target() {
-        return target;
-    }
-
-    /** {@inheritDoc} */
-    @Override public String toString() {
-        return S.toString(IgfsClientClosureInOperation.class, this);
-    }
-}

http://git-wip-us.apache.org/repos/asf/ignite/blob/92f7d9d0/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
----------------------------------------------------------------------
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
index 1065d5e..cdb77031 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/client/IgfsClientClosureManager.java
@@ -22,7 +22,7 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.processors.igfs.IgfsContext;
 import org.apache.ignite.internal.processors.igfs.IgfsManager;
-import org.apache.ignite.internal.util.GridStripedSpinBusyLock;
+import org.apache.ignite.internal.util.StripedCompositeReadWriteLock;
 import org.apache.ignite.internal.util.typedef.internal.S;
 import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.marshaller.Marshaller;
@@ -38,7 +38,7 @@ import java.util.concurrent.ConcurrentLinkedDeque;
  */
 public class IgfsClientClosureManager extends IgfsManager {
     /** Pending input operations received when manager is not started yet. */
-    private final ConcurrentLinkedDeque<IgfsClientClosureInOperation> inOps = 
new ConcurrentLinkedDeque<>();
+    private final ConcurrentLinkedDeque<IgfsClientClosureRequest> pending = 
new ConcurrentLinkedDeque<>();
 
     /** Outgoing operations. */
     private final Map<Long, IgfsClientClosureOutOperation> outOps = new 
ConcurrentHashMap<>();
@@ -46,9 +46,16 @@ public class IgfsClientClosureManager extends IgfsManager {
     /** Marshaller. */
     private final Marshaller marsh;
 
+    /** Whether manager is fully started and ready to process requests. */
+    private volatile boolean ready;
+
     /** Stopping flag. */
     private volatile boolean stopping;
 
+    /** RW lock for synchronization. */
+    private final StripedCompositeReadWriteLock rwLock =
+        new 
StripedCompositeReadWriteLock(Runtime.getRuntime().availableProcessors() * 2);
+
     /**
      * Constructor.
      *
@@ -99,6 +106,7 @@ public class IgfsClientClosureManager extends IgfsManager {
      * @return Future.
      */
     public <T> IgniteInternalFuture<T> executeAsync(IgfsContext igfsCtx, 
IgfsClientAbstractCallable<T> clo) {
+
         // TODO
 
         return null;
@@ -146,6 +154,37 @@ public class IgfsClientClosureManager extends IgfsManager {
         // TODO
     }
 
+    /**
+     * Handle closure request.
+     *
+     * @param req Request.
+     */
+    private void onClosureRequest(IgfsClientClosureRequest req) {
+        rwLock.readLock().lock();
+
+        try {
+            if (stopping)
+                return; // Discovery listener on remote node will handle node 
leave.
+
+            if (ready)
+                onClosureRequest0(req); // Normal execution flow.
+            else
+                pending.add(req); // Add to pending set if manager is not 
fully started yet.
+        }
+        finally {
+            rwLock.readLock().unlock();
+        }
+    }
+
+    /**
+     * Actual request processing. Happens inside appropriate thread pool.
+     *
+     * @param req Request.
+     */
+    private void onClosureRequest0(IgfsClientClosureRequest req) {
+        // TODO
+    }
+
     /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(IgfsClientClosureManager.class, this);

Reply via email to