Repository: asterixdb
Updated Branches:
  refs/heads/master 929344e93 -> 214d3735d


[NO ISSUE][TEST] Add NC Storage API Test

- user model changes: no
- storage format changes: no
- interface changes: yes
  - Add IPartitionReplica to use it at the
    APIs level.
  - Rename IStorageSubsystem -> IReplicaManager

Details:
- Add option to TestExecutor to target specific
  NC end point.
- Add storage API test case.

Change-Id: I76c336a66e32036a34d30ce6d3a31195c342c4a9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2195
Sonar-Qube: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/214d3735
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/214d3735
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/214d3735

Branch: refs/heads/master
Commit: 214d3735d669e5e7ecc2c7b58e108f8d7da9a2cb
Parents: 929344e
Author: Murtadha Hubail <[email protected]>
Authored: Fri Dec 8 02:19:39 2017 +0300
Committer: Murtadha Hubail <[email protected]>
Committed: Thu Dec 7 19:22:46 2017 -0800

----------------------------------------------------------------------
 .../api/http/server/StorageApiServlet.java      | 19 ++--
 .../asterix/app/nc/NCAppRuntimeContext.java     | 10 +-
 .../apache/asterix/app/nc/ReplicaManager.java   | 77 ++++++++++++++++
 .../apache/asterix/app/nc/StorageSubsystem.java | 75 ---------------
 .../asterix/test/common/TestExecutor.java       | 68 +++++++++++++-
 .../test/runtime/ReplicationExecutionTest.java  | 82 +++++++++++++++++
 .../add_replica/add_replica.1.sto.cmd           | 19 ++++
 .../add_replica/add_replica.2.get.http          | 19 ++++
 .../test/resources/runtimets/replication.xml    | 28 ++++++
 .../replication/add_replica/add_replica.2.adm   |  7 ++
 .../common/api/INcApplicationContext.java       |  7 +-
 .../common/exceptions/ReplicationException.java | 26 ++++++
 .../common/replication/IPartitionReplica.java   | 47 ++++++++++
 .../asterix/common/storage/IReplicaManager.java | 57 ++++++++++++
 .../common/storage/IStorageSubsystem.java       | 55 -----------
 .../apache/asterix/common/utils/Servlets.java   |  4 +
 asterixdb/asterix-replication/pom.xml           |  8 ++
 .../replication/storage/PartitionReplica.java   | 97 ++++++++++++++++++++
 18 files changed, 553 insertions(+), 152 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d8636c8..8e73405 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -30,8 +30,8 @@ import java.util.logging.Logger;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.PartitionReplica;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
 import org.apache.hyracks.http.api.IServletRequest;
 import org.apache.hyracks.http.api.IServletResponse;
@@ -109,17 +109,18 @@ public class StorageApiServlet extends AbstractServlet {
 
     private JsonNode getStatus(Predicate<Integer> predicate) {
         final ArrayNode status = OBJECT_MAPPER.createArrayNode();
-        final IStorageSubsystem storageSubsystem = 
appCtx.getStorageSubsystem();
+        final IReplicaManager storageSubsystem = appCtx.getReplicaManager();
         final Set<Integer> partitions =
                 
storageSubsystem.getPartitions().stream().filter(predicate).collect(Collectors.toSet());
         for (Integer partition : partitions) {
             final ObjectNode partitionJson = OBJECT_MAPPER.createObjectNode();
             partitionJson.put("partition", partition);
-            final List<PartitionReplica> replicas = 
storageSubsystem.getReplicas(partition);
+            final List<IPartitionReplica> replicas = 
storageSubsystem.getReplicas(partition);
             ArrayNode replicasArray = OBJECT_MAPPER.createArrayNode();
-            for (PartitionReplica replica : replicas) {
+            for (IPartitionReplica replica : replicas) {
                 final ObjectNode replicaJson = 
OBJECT_MAPPER.createObjectNode();
-                replicaJson.put("location", 
replica.getIdentifier().getLocation().toString());
+                final InetSocketAddress location = 
replica.getIdentifier().getLocation();
+                replicaJson.put("location", location.getHostString() + ":" + 
location.getPort());
                 replicaJson.put("status", replica.getStatus().toString());
                 replicasArray.add(replicaJson);
             }
@@ -135,7 +136,7 @@ public class StorageApiServlet extends AbstractServlet {
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
         }
-        appCtx.getStorageSubsystem().addReplica(replicaIdentifier);
+        appCtx.getReplicaManager().addReplica(replicaIdentifier);
         response.setStatus(HttpResponseStatus.OK);
     }
 
@@ -145,7 +146,7 @@ public class StorageApiServlet extends AbstractServlet {
             response.setStatus(HttpResponseStatus.BAD_REQUEST);
             return;
         }
-        appCtx.getStorageSubsystem().removeReplica(replicaIdentifier);
+        appCtx.getReplicaManager().removeReplica(replicaIdentifier);
         response.setStatus(HttpResponseStatus.OK);
     }
 
@@ -156,7 +157,7 @@ public class StorageApiServlet extends AbstractServlet {
         if (partition == null || host == null || port == null) {
             return null;
         }
-        final InetSocketAddress replicaAddress = 
InetSocketAddress.createUnresolved(host, Integer.valueOf(port));
+        final InetSocketAddress replicaAddress = new InetSocketAddress(host, 
Integer.valueOf(port));
         return ReplicaIdentifier.of(Integer.valueOf(partition), 
replicaAddress);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b6bf2df..75159af 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -61,7 +61,7 @@ import 
org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -142,8 +142,8 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     private final NCExtensionManager ncExtensionManager;
     private final IStorageComponentProvider componentProvider;
     private IHyracksClientConnection hcc;
-    private IStorageSubsystem storageSubsystem;
     private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+    private IReplicaManager replicaManager;
 
     public NCAppRuntimeContext(INCServiceContext ncServiceContext, 
List<AsterixExtension> extensions)
             throws AsterixException, InstantiationException, 
IllegalAccessException, ClassNotFoundException,
@@ -213,7 +213,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         final ClusterPartition[] nodePartitions = 
metadataProperties.getNodePartitions().get(nodeId);
         final Set<Integer> nodePartitionsIds = 
Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
                 .collect(Collectors.toSet());
-        storageSubsystem = new StorageSubsystem(nodePartitionsIds);
+        replicaManager = new ReplicaManager(nodePartitionsIds);
         isShuttingdown = false;
         activeManager = new ActiveManager(threadExecutor, 
getServiceContext().getNodeId(),
                 activeProperties.getMemoryComponentGlobalBudget(), 
compilerProperties.getFrameSize(),
@@ -528,8 +528,8 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
     }
 
     @Override
-    public IStorageSubsystem getStorageSubsystem() {
-        return storageSubsystem;
+    public IReplicaManager getReplicaManager() {
+        return replicaManager;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
new file mode 100644
index 0000000..0c84a6e
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.IReplicaManager;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.asterix.replication.storage.PartitionReplica;
+
+public class ReplicaManager implements IReplicaManager {
+
+    /**
+     * the partitions to which the current node is master
+     */
+    private final Set<Integer> partitions = new HashSet<>();
+    /**
+     * current replicas
+     */
+    private final Map<ReplicaIdentifier, PartitionReplica> replicas = new 
HashMap<>();
+
+    public ReplicaManager(Set<Integer> partitions) {
+        this.partitions.addAll(partitions);
+    }
+
+    @Override
+    public synchronized void addReplica(ReplicaIdentifier id) {
+        if (!partitions.contains(id.getPartition())) {
+            throw new IllegalStateException(
+                    "This node is not the current master of partition(" + 
id.getPartition() + ")");
+        }
+        replicas.computeIfAbsent(id, key -> new PartitionReplica(key));
+        replicas.get(id).sync();
+    }
+
+    @Override
+    public synchronized void removeReplica(ReplicaIdentifier id) {
+        if (!replicas.containsKey(id)) {
+            throw new IllegalStateException("replica with id(" + id + ") does 
not exist");
+        }
+        replicas.remove(id);
+    }
+
+    @Override
+    public List<IPartitionReplica> getReplicas(int partition) {
+        return replicas.entrySet().stream().filter(e -> 
e.getKey().getPartition() == partition).map(Map.Entry::getValue)
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Set<Integer> getPartitions() {
+        return Collections.unmodifiableSet(partitions);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
deleted file mode 100644
index 24aa376..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.nc;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.PartitionReplica;
-import org.apache.asterix.common.storage.ReplicaIdentifier;
-
-public class StorageSubsystem implements IStorageSubsystem {
-
-    /**
-     * the partitions to which the current node is master
-     */
-    private final Set<Integer> partitions = new HashSet<>();
-    /**
-     * current replicas
-     */
-    private final Map<ReplicaIdentifier, PartitionReplica> replicas = new 
HashMap<>();
-
-    public StorageSubsystem(Set<Integer> partitions) {
-        this.partitions.addAll(partitions);
-    }
-
-    @Override
-    public synchronized void addReplica(ReplicaIdentifier id) {
-        if (!partitions.contains(id.getPartition())) {
-            throw new IllegalStateException(
-                    "This node is not the current master of partition(" + 
id.getPartition() + ")");
-        }
-        replicas.computeIfAbsent(id, key -> new PartitionReplica(key));
-    }
-
-    @Override
-    public synchronized void removeReplica(ReplicaIdentifier id) {
-        if (!replicas.containsKey(id)) {
-            throw new IllegalStateException("replica with id(" + id + ") does 
not exist");
-        }
-        replicas.remove(id);
-    }
-
-    @Override
-    public List<PartitionReplica> getReplicas(int partition) {
-        return replicas.entrySet().stream().filter(e -> 
e.getKey().getPartition() == partition).map(Map.Entry::getValue)
-                .collect(Collectors.toList());
-    }
-
-    @Override
-    public Set<Integer> getPartitions() {
-        return Collections.unmodifiableSet(partitions);
-    }
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index b518f94..d840daf 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -57,6 +57,7 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Stream;
 
 import org.apache.asterix.app.external.IExternalUDFLibrarian;
 import org.apache.asterix.common.api.Duration;
@@ -129,6 +130,8 @@ public class TestExecutor {
 
     private static Method managixExecuteMethod = null;
     private static final HashMap<Integer, ITestServer> runningTestServers = 
new HashMap<>();
+    private static Map<String, InetSocketAddress> ncEndPoints;
+    private static Map<String, InetSocketAddress> replicationAddress;
 
     /*
      * Instance members
@@ -158,6 +161,14 @@ public class TestExecutor {
         this.librarian = librarian;
     }
 
+    public void setNcEndPoints(Map<String, InetSocketAddress> ncEndPoints) {
+        this.ncEndPoints = ncEndPoints;
+    }
+
+    public void setNcReplicationAddress(Map<String, InetSocketAddress> 
replicationAddress) {
+        this.replicationAddress = replicationAddress;
+    }
+
     /**
      * Probably does not work well with symlinks.
      */
@@ -1139,7 +1150,10 @@ public class TestExecutor {
                 // we only reach here if the loop is over
                 testLoops.remove(testFile);
                 break;
-
+            case "sto":
+                command = stripJavaComments(statement).trim().split(" ");
+                executeStorageCommand(command);
+                break;
             default:
                 throw new IllegalArgumentException("No statements of type " + 
ctx.getType());
         }
@@ -1510,15 +1524,26 @@ public class TestExecutor {
     }
 
     protected URI createEndpointURI(String path, String query) throws 
URISyntaxException {
-        int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
-        InetSocketAddress endpoint = endpoints.get(endpointIdx);
+        InetSocketAddress endpoint;
+        if (!path.startsWith("nc:")) {
+            int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+            endpoint = endpoints.get(endpointIdx);
+        } else {
+            final String[] tokens = path.split(" ");
+            if (tokens.length != 2) {
+                throw new IllegalArgumentException("Unrecognized http 
pattern");
+            }
+            String nodeId = tokens[0].substring(3);
+            endpoint = getNcEndPoint(nodeId);
+            path = tokens[1];
+        }
         URI uri = new URI("http", null, endpoint.getHostString(), 
endpoint.getPort(), path, query, null);
         LOGGER.fine("Created endpoint URI: " + uri);
         return uri;
     }
 
     public URI getEndpoint(String servlet) throws URISyntaxException {
-        return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""), 
null);
+        return createEndpointURI(Servlets.getAbsolutePath(getPath(servlet)), 
null);
     }
 
     public static String stripJavaComments(String text) {
@@ -1622,6 +1647,41 @@ public class TestExecutor {
         LOGGER.info("Cluster state now " + desiredState);
     }
 
+    private void executeStorageCommand(String[] command) throws Exception {
+        String srcNode = command[0];
+        String api = command[1];
+        final URI endpoint = getEndpoint(srcNode + " " + 
Servlets.getAbsolutePath(Servlets.STORAGE) + api);
+        String partition = command[2];
+        String destNode = command[3];
+        final InetSocketAddress destAddress = 
getNcReplicationAddress(destNode);
+        List<Parameter> parameters = new ArrayList<>(3);
+        Stream.of("partition", "host", "port").forEach(arg -> {
+            Parameter p = new Parameter();
+            p.setName(arg);
+            parameters.add(p);
+        });
+        parameters.get(0).setValue(partition);
+        parameters.get(1).setValue(destAddress.getHostName());
+        parameters.get(2).setValue(String.valueOf(destAddress.getPort()));
+        final HttpUriRequest httpUriRequest = constructPostMethod(endpoint, 
parameters);
+        final HttpResponse httpResponse = executeHttpRequest(httpUriRequest);
+        Assert.assertEquals(HttpStatus.SC_OK, 
httpResponse.getStatusLine().getStatusCode());
+    }
+
+    private InetSocketAddress getNcEndPoint(String nodeId) {
+        if (ncEndPoints == null || !ncEndPoints.containsKey(nodeId)) {
+            throw new IllegalStateException("No end point specified for node: 
" + nodeId);
+        }
+        return ncEndPoints.get(nodeId);
+    }
+
+    private InetSocketAddress getNcReplicationAddress(String nodeId) {
+        if (replicationAddress == null || 
!replicationAddress.containsKey(nodeId)) {
+            throw new IllegalStateException("No replication address specified 
for node: " + nodeId);
+        }
+        return replicationAddress.get(nodeId);
+    }
+
     abstract static class TestLoop extends Exception {
 
         private final String target;

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
new file mode 100644
index 0000000..56c7bc0
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class ReplicationExecutionTest {
+    protected static final String TEST_CONFIG_FILE_NAME = 
"asterix-build-configuration.xml";
+    private static final TestExecutor testExecutor = new TestExecutor();
+
+    @BeforeClass
+    public static void setUp() throws Exception {
+        LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+        final NodeControllerService[] ncs = 
ExecutionTestUtil.integrationUtil.ncs;
+        Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+        Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
+        final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+        for (NodeControllerService nc : ncs) {
+            final String nodeId = nc.getId();
+            final INcApplicationContext appCtx = (INcApplicationContext) 
nc.getApplicationContext();
+            int apiPort = appCtx.getExternalProperties().getNcApiPort();
+            int replicationPort = 
appCtx.getReplicationProperties().getDataReplicationPort(nodeId);
+            ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip, 
apiPort));
+            replicationAddress.put(nodeId, 
InetSocketAddress.createUnresolved(ip, replicationPort));
+        }
+        testExecutor.setNcEndPoints(ncEndPoints);
+        testExecutor.setNcReplicationAddress(replicationAddress);
+    }
+
+    @AfterClass
+    public static void tearDown() throws Exception {
+        LangExecutionUtil.tearDown();
+    }
+
+    @Parameters(name = "ReplicationExecutionTest {index}: {0}")
+    public static Collection<Object[]> tests() throws Exception {
+        return LangExecutionUtil.tests("replication.xml", "replication.xml");
+    }
+
+    protected TestCaseContext tcCtx;
+
+    public ReplicationExecutionTest(TestCaseContext tcCtx) {
+        this.tcCtx = tcCtx;
+    }
+
+    @Test
+    public void test() throws Exception {
+        LangExecutionUtil.test(tcCtx);
+    }
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
new file mode 100644
index 0000000..7ddaa20
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /addReplica 0 asterix_nc2
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
new file mode 100644
index 0000000..d287fad
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /admin/storage/partition/0

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml 
b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
new file mode 100644
index 0000000..a635676
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
@@ -0,0 +1,28 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements.  See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership.  The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License.  You may obtain a copy of the License at
+ !
+ !   http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied.  See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org" 
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
+            QueryFileExtension=".sqlpp">
+  <test-group name="replication">
+    <test-case FilePath="replication">
+      <compilation-unit name="add_replica">
+        <output-dir compare="Text">add_replica</output-dir>
+      </compilation-unit>
+    </test-case>
+  </test-group>
+</test-suite>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
new file mode 100644
index 0000000..3553d9c
--- /dev/null
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
@@ -0,0 +1,7 @@
+[ {
+  "partition" : 0,
+  "replicas" : [ {
+    "location" : "127.0.0.1:2017",
+    "status" : "DISCONNECTED"
+  } ]
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 0503c09..28be6fa 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -29,8 +29,7 @@ import 
org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.application.INCServiceContext;
@@ -119,7 +118,7 @@ public interface INcApplicationContext extends 
IApplicationContext {
     @Override
     INCServiceContext getServiceContext();
 
-    IStorageSubsystem getStorageSubsystem();
-
     IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
+
+    IReplicaManager getReplicaManager();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
new file mode 100644
index 0000000..034d668
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
@@ -0,0 +1,26 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+package org.apache.asterix.common.exceptions;
+
+public class ReplicationException extends RuntimeException {
+
+    public ReplicationException(Throwable cause) {
+        super(cause);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
new file mode 100644
index 0000000..5a9dc3f
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+
+public interface IPartitionReplica {
+
+    enum PartitionReplicaStatus {
+        /* replica is in-sync with master */
+        IN_SYNC,
+        /* replica is still catching up with master */
+        CATCHING_UP,
+        /* replica is not connected with master */
+        DISCONNECTED
+    }
+
+    /**
+     * Gets the status of a replica.
+     *
+     * @return The status
+     */
+    PartitionReplicaStatus getStatus();
+
+    /**
+     * Gets the identifier of a replica
+     *
+     * @return The identifier
+     */
+    ReplicaIdentifier getIdentifier();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
new file mode 100644
index 0000000..a3b2b50
--- /dev/null
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.replication.IPartitionReplica;
+
+public interface IReplicaManager {
+
+    /**
+     * Adds a replica with the specified {@code id}
+     *
+     * @param id
+     */
+    void addReplica(ReplicaIdentifier id);
+
+    /**
+     * Removes the replica with the specified {@code id}
+     *
+     * @param id
+     */
+    void removeReplica(ReplicaIdentifier id);
+
+    /**
+     * The existing replicas of the partition {@code partition}
+     *
+     * @param partition
+     * @return The list of replicas
+     */
+    List<IPartitionReplica> getReplicas(int partition);
+
+    /**
+     * Gets the list of partition to which the current node is
+     * the master of.
+     *
+     * @return The list of partition
+     */
+    Set<Integer> getPartitions();
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
deleted file mode 100644
index b4f06cb..0000000
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.storage;
-
-import java.util.List;
-import java.util.Set;
-
-public interface IStorageSubsystem {
-
-    /**
-     * Adds a replica with the specified {@code id}
-     *
-     * @param id
-     */
-    void addReplica(ReplicaIdentifier id);
-
-    /**
-     * Removes the replica with the specified {@code id}
-     *
-     * @param id
-     */
-    void removeReplica(ReplicaIdentifier id);
-
-    /**
-     * The existing replicas of the partition {@code partition}
-     *
-     * @param partition
-     * @return The list of replicas
-     */
-    List<PartitionReplica> getReplicas(int partition);
-
-    /**
-     * Gets the list of partition to which the current node is
-     * the master of.
-     *
-     * @return The list of partition
-     */
-    Set<Integer> getPartitions();
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index d5f23bf..1ac3ffa 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -46,4 +46,8 @@ public class Servlets {
 
     private Servlets() {
     }
+
+    public static String getAbsolutePath(String servlet) {
+        return servlet.replaceAll("/\\*$", "");
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-replication/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/pom.xml 
b/asterixdb/asterix-replication/pom.xml
index f209aae..2b5fe0c 100644
--- a/asterixdb/asterix-replication/pom.xml
+++ b/asterixdb/asterix-replication/pom.xml
@@ -71,6 +71,14 @@
       <artifactId>asterix-transactions</artifactId>
       <version>${project.version}</version>
     </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+    </dependency>
   </dependencies>
 
 </project>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
new file mode 100644
index 0000000..c6d1b60
--- /dev/null
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.storage;
+
+import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.CATCHING_UP;
+import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED;
+import static 
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC;
+
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@ThreadSafe
+public class PartitionReplica implements IPartitionReplica {
+
+    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    private final ReplicaIdentifier id;
+    private PartitionReplicaStatus status = DISCONNECTED;
+
+    public PartitionReplica(ReplicaIdentifier id) {
+        this.id = id;
+    }
+
+    @Override
+    public synchronized PartitionReplicaStatus getStatus() {
+        return status;
+    }
+
+    @Override
+    public ReplicaIdentifier getIdentifier() {
+        return id;
+    }
+
+    public synchronized void sync() {
+        if (status == IN_SYNC || status == CATCHING_UP) {
+            return;
+        }
+    }
+
+    public JsonNode asJson() {
+        ObjectNode json = OBJECT_MAPPER.createObjectNode();
+        json.put("id", id.toString());
+        json.put("state", status.name());
+        return json;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        PartitionReplica that = (PartitionReplica) o;
+        return id.equals(that.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return id.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        try {
+            return JSONUtil.convertNode(asJson());
+        } catch (JsonProcessingException e) {
+            throw new ReplicationException(e);
+        }
+    }
+}
\ No newline at end of file

Reply via email to