Repository: asterixdb
Updated Branches:
refs/heads/master 929344e93 -> 214d3735d
[NO ISSUE][TEST] Add NC Storage API Test
- user model changes: no
- storage format changes: no
- interface changes: yes
- Add IPartitionReplica to use it at the
APIs level.
- Rename IStorageSubsystem -> IReplicaManager
Details:
- Add option to TestExecutor to target specific
NC end point.
- Add storage API test case.
Change-Id: I76c336a66e32036a34d30ce6d3a31195c342c4a9
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2195
Sonar-Qube: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Contrib: Jenkins <[email protected]>
Reviewed-by: Murtadha Hubail <[email protected]>
Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/214d3735
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/214d3735
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/214d3735
Branch: refs/heads/master
Commit: 214d3735d669e5e7ecc2c7b58e108f8d7da9a2cb
Parents: 929344e
Author: Murtadha Hubail <[email protected]>
Authored: Fri Dec 8 02:19:39 2017 +0300
Committer: Murtadha Hubail <[email protected]>
Committed: Thu Dec 7 19:22:46 2017 -0800
----------------------------------------------------------------------
.../api/http/server/StorageApiServlet.java | 19 ++--
.../asterix/app/nc/NCAppRuntimeContext.java | 10 +-
.../apache/asterix/app/nc/ReplicaManager.java | 77 ++++++++++++++++
.../apache/asterix/app/nc/StorageSubsystem.java | 75 ---------------
.../asterix/test/common/TestExecutor.java | 68 +++++++++++++-
.../test/runtime/ReplicationExecutionTest.java | 82 +++++++++++++++++
.../add_replica/add_replica.1.sto.cmd | 19 ++++
.../add_replica/add_replica.2.get.http | 19 ++++
.../test/resources/runtimets/replication.xml | 28 ++++++
.../replication/add_replica/add_replica.2.adm | 7 ++
.../common/api/INcApplicationContext.java | 7 +-
.../common/exceptions/ReplicationException.java | 26 ++++++
.../common/replication/IPartitionReplica.java | 47 ++++++++++
.../asterix/common/storage/IReplicaManager.java | 57 ++++++++++++
.../common/storage/IStorageSubsystem.java | 55 -----------
.../apache/asterix/common/utils/Servlets.java | 4 +
asterixdb/asterix-replication/pom.xml | 8 ++
.../replication/storage/PartitionReplica.java | 97 ++++++++++++++++++++
18 files changed, 553 insertions(+), 152 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
index d8636c8..8e73405 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/StorageApiServlet.java
@@ -30,8 +30,8 @@ import java.util.logging.Logger;
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.PartitionReplica;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.storage.ReplicaIdentifier;
import org.apache.hyracks.http.api.IServletRequest;
import org.apache.hyracks.http.api.IServletResponse;
@@ -109,17 +109,18 @@ public class StorageApiServlet extends AbstractServlet {
private JsonNode getStatus(Predicate<Integer> predicate) {
final ArrayNode status = OBJECT_MAPPER.createArrayNode();
- final IStorageSubsystem storageSubsystem =
appCtx.getStorageSubsystem();
+ final IReplicaManager storageSubsystem = appCtx.getReplicaManager();
final Set<Integer> partitions =
storageSubsystem.getPartitions().stream().filter(predicate).collect(Collectors.toSet());
for (Integer partition : partitions) {
final ObjectNode partitionJson = OBJECT_MAPPER.createObjectNode();
partitionJson.put("partition", partition);
- final List<PartitionReplica> replicas =
storageSubsystem.getReplicas(partition);
+ final List<IPartitionReplica> replicas =
storageSubsystem.getReplicas(partition);
ArrayNode replicasArray = OBJECT_MAPPER.createArrayNode();
- for (PartitionReplica replica : replicas) {
+ for (IPartitionReplica replica : replicas) {
final ObjectNode replicaJson =
OBJECT_MAPPER.createObjectNode();
- replicaJson.put("location",
replica.getIdentifier().getLocation().toString());
+ final InetSocketAddress location =
replica.getIdentifier().getLocation();
+ replicaJson.put("location", location.getHostString() + ":" +
location.getPort());
replicaJson.put("status", replica.getStatus().toString());
replicasArray.add(replicaJson);
}
@@ -135,7 +136,7 @@ public class StorageApiServlet extends AbstractServlet {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- appCtx.getStorageSubsystem().addReplica(replicaIdentifier);
+ appCtx.getReplicaManager().addReplica(replicaIdentifier);
response.setStatus(HttpResponseStatus.OK);
}
@@ -145,7 +146,7 @@ public class StorageApiServlet extends AbstractServlet {
response.setStatus(HttpResponseStatus.BAD_REQUEST);
return;
}
- appCtx.getStorageSubsystem().removeReplica(replicaIdentifier);
+ appCtx.getReplicaManager().removeReplica(replicaIdentifier);
response.setStatus(HttpResponseStatus.OK);
}
@@ -156,7 +157,7 @@ public class StorageApiServlet extends AbstractServlet {
if (partition == null || host == null || port == null) {
return null;
}
- final InetSocketAddress replicaAddress =
InetSocketAddress.createUnresolved(host, Integer.valueOf(port));
+ final InetSocketAddress replicaAddress = new InetSocketAddress(host,
Integer.valueOf(port));
return ReplicaIdentifier.of(Integer.valueOf(partition),
replicaAddress);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index b6bf2df..75159af 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -61,7 +61,7 @@ import
org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.IStorageSubsystem;
+import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
@@ -142,8 +142,8 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
private final NCExtensionManager ncExtensionManager;
private final IStorageComponentProvider componentProvider;
private IHyracksClientConnection hcc;
- private IStorageSubsystem storageSubsystem;
private IIndexCheckpointManagerProvider indexCheckpointManagerProvider;
+ private IReplicaManager replicaManager;
public NCAppRuntimeContext(INCServiceContext ncServiceContext,
List<AsterixExtension> extensions)
throws AsterixException, InstantiationException,
IllegalAccessException, ClassNotFoundException,
@@ -213,7 +213,7 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
final ClusterPartition[] nodePartitions =
metadataProperties.getNodePartitions().get(nodeId);
final Set<Integer> nodePartitionsIds =
Arrays.stream(nodePartitions).map(ClusterPartition::getPartitionId)
.collect(Collectors.toSet());
- storageSubsystem = new StorageSubsystem(nodePartitionsIds);
+ replicaManager = new ReplicaManager(nodePartitionsIds);
isShuttingdown = false;
activeManager = new ActiveManager(threadExecutor,
getServiceContext().getNodeId(),
activeProperties.getMemoryComponentGlobalBudget(),
compilerProperties.getFrameSize(),
@@ -528,8 +528,8 @@ public class NCAppRuntimeContext implements
INcApplicationContext {
}
@Override
- public IStorageSubsystem getStorageSubsystem() {
- return storageSubsystem;
+ public IReplicaManager getReplicaManager() {
+ return replicaManager;
}
@Override
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
new file mode 100644
index 0000000..0c84a6e
--- /dev/null
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.app.nc;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.IReplicaManager;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.asterix.replication.storage.PartitionReplica;
+
+public class ReplicaManager implements IReplicaManager {
+
+ /**
+ * the partitions to which the current node is master
+ */
+ private final Set<Integer> partitions = new HashSet<>();
+ /**
+ * current replicas
+ */
+ private final Map<ReplicaIdentifier, PartitionReplica> replicas = new
HashMap<>();
+
+ public ReplicaManager(Set<Integer> partitions) {
+ this.partitions.addAll(partitions);
+ }
+
+ @Override
+ public synchronized void addReplica(ReplicaIdentifier id) {
+ if (!partitions.contains(id.getPartition())) {
+ throw new IllegalStateException(
+ "This node is not the current master of partition(" +
id.getPartition() + ")");
+ }
+ replicas.computeIfAbsent(id, key -> new PartitionReplica(key));
+ replicas.get(id).sync();
+ }
+
+ @Override
+ public synchronized void removeReplica(ReplicaIdentifier id) {
+ if (!replicas.containsKey(id)) {
+ throw new IllegalStateException("replica with id(" + id + ") does
not exist");
+ }
+ replicas.remove(id);
+ }
+
+ @Override
+ public List<IPartitionReplica> getReplicas(int partition) {
+ return replicas.entrySet().stream().filter(e ->
e.getKey().getPartition() == partition).map(Map.Entry::getValue)
+ .collect(Collectors.toList());
+ }
+
+ @Override
+ public Set<Integer> getPartitions() {
+ return Collections.unmodifiableSet(partitions);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
deleted file mode 100644
index 24aa376..0000000
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/StorageSubsystem.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.app.nc;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.PartitionReplica;
-import org.apache.asterix.common.storage.ReplicaIdentifier;
-
-public class StorageSubsystem implements IStorageSubsystem {
-
- /**
- * the partitions to which the current node is master
- */
- private final Set<Integer> partitions = new HashSet<>();
- /**
- * current replicas
- */
- private final Map<ReplicaIdentifier, PartitionReplica> replicas = new
HashMap<>();
-
- public StorageSubsystem(Set<Integer> partitions) {
- this.partitions.addAll(partitions);
- }
-
- @Override
- public synchronized void addReplica(ReplicaIdentifier id) {
- if (!partitions.contains(id.getPartition())) {
- throw new IllegalStateException(
- "This node is not the current master of partition(" +
id.getPartition() + ")");
- }
- replicas.computeIfAbsent(id, key -> new PartitionReplica(key));
- }
-
- @Override
- public synchronized void removeReplica(ReplicaIdentifier id) {
- if (!replicas.containsKey(id)) {
- throw new IllegalStateException("replica with id(" + id + ") does
not exist");
- }
- replicas.remove(id);
- }
-
- @Override
- public List<PartitionReplica> getReplicas(int partition) {
- return replicas.entrySet().stream().filter(e ->
e.getKey().getPartition() == partition).map(Map.Entry::getValue)
- .collect(Collectors.toList());
- }
-
- @Override
- public Set<Integer> getPartitions() {
- return Collections.unmodifiableSet(partitions);
- }
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
index b518f94..d840daf 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/common/TestExecutor.java
@@ -57,6 +57,7 @@ import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import java.util.stream.Stream;
import org.apache.asterix.app.external.IExternalUDFLibrarian;
import org.apache.asterix.common.api.Duration;
@@ -129,6 +130,8 @@ public class TestExecutor {
private static Method managixExecuteMethod = null;
private static final HashMap<Integer, ITestServer> runningTestServers =
new HashMap<>();
+ private static Map<String, InetSocketAddress> ncEndPoints;
+ private static Map<String, InetSocketAddress> replicationAddress;
/*
* Instance members
@@ -158,6 +161,14 @@ public class TestExecutor {
this.librarian = librarian;
}
+ public void setNcEndPoints(Map<String, InetSocketAddress> ncEndPoints) {
+ this.ncEndPoints = ncEndPoints;
+ }
+
+ public void setNcReplicationAddress(Map<String, InetSocketAddress>
replicationAddress) {
+ this.replicationAddress = replicationAddress;
+ }
+
/**
* Probably does not work well with symlinks.
*/
@@ -1139,7 +1150,10 @@ public class TestExecutor {
// we only reach here if the loop is over
testLoops.remove(testFile);
break;
-
+ case "sto":
+ command = stripJavaComments(statement).trim().split(" ");
+ executeStorageCommand(command);
+ break;
default:
throw new IllegalArgumentException("No statements of type " +
ctx.getType());
}
@@ -1510,15 +1524,26 @@ public class TestExecutor {
}
protected URI createEndpointURI(String path, String query) throws
URISyntaxException {
- int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
- InetSocketAddress endpoint = endpoints.get(endpointIdx);
+ InetSocketAddress endpoint;
+ if (!path.startsWith("nc:")) {
+ int endpointIdx = Math.abs(endpointSelector++ % endpoints.size());
+ endpoint = endpoints.get(endpointIdx);
+ } else {
+ final String[] tokens = path.split(" ");
+ if (tokens.length != 2) {
+ throw new IllegalArgumentException("Unrecognized http
pattern");
+ }
+ String nodeId = tokens[0].substring(3);
+ endpoint = getNcEndPoint(nodeId);
+ path = tokens[1];
+ }
URI uri = new URI("http", null, endpoint.getHostString(),
endpoint.getPort(), path, query, null);
LOGGER.fine("Created endpoint URI: " + uri);
return uri;
}
public URI getEndpoint(String servlet) throws URISyntaxException {
- return createEndpointURI(getPath(servlet).replaceAll("/\\*$", ""),
null);
+ return createEndpointURI(Servlets.getAbsolutePath(getPath(servlet)),
null);
}
public static String stripJavaComments(String text) {
@@ -1622,6 +1647,41 @@ public class TestExecutor {
LOGGER.info("Cluster state now " + desiredState);
}
+ private void executeStorageCommand(String[] command) throws Exception {
+ String srcNode = command[0];
+ String api = command[1];
+ final URI endpoint = getEndpoint(srcNode + " " +
Servlets.getAbsolutePath(Servlets.STORAGE) + api);
+ String partition = command[2];
+ String destNode = command[3];
+ final InetSocketAddress destAddress =
getNcReplicationAddress(destNode);
+ List<Parameter> parameters = new ArrayList<>(3);
+ Stream.of("partition", "host", "port").forEach(arg -> {
+ Parameter p = new Parameter();
+ p.setName(arg);
+ parameters.add(p);
+ });
+ parameters.get(0).setValue(partition);
+ parameters.get(1).setValue(destAddress.getHostName());
+ parameters.get(2).setValue(String.valueOf(destAddress.getPort()));
+ final HttpUriRequest httpUriRequest = constructPostMethod(endpoint,
parameters);
+ final HttpResponse httpResponse = executeHttpRequest(httpUriRequest);
+ Assert.assertEquals(HttpStatus.SC_OK,
httpResponse.getStatusLine().getStatusCode());
+ }
+
+ private InetSocketAddress getNcEndPoint(String nodeId) {
+ if (ncEndPoints == null || !ncEndPoints.containsKey(nodeId)) {
+ throw new IllegalStateException("No end point specified for node:
" + nodeId);
+ }
+ return ncEndPoints.get(nodeId);
+ }
+
+ private InetSocketAddress getNcReplicationAddress(String nodeId) {
+ if (replicationAddress == null ||
!replicationAddress.containsKey(nodeId)) {
+ throw new IllegalStateException("No replication address specified
for node: " + nodeId);
+ }
+ return replicationAddress.get(nodeId);
+ }
+
abstract static class TestLoop extends Exception {
private final String target;
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
new file mode 100644
index 0000000..56c7bc0
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/test/runtime/ReplicationExecutionTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.test.runtime;
+
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.test.common.TestExecutor;
+import org.apache.asterix.testframework.context.TestCaseContext;
+import org.apache.hyracks.control.nc.NodeControllerService;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+@RunWith(Parameterized.class)
+public class ReplicationExecutionTest {
+ protected static final String TEST_CONFIG_FILE_NAME =
"asterix-build-configuration.xml";
+ private static final TestExecutor testExecutor = new TestExecutor();
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ LangExecutionUtil.setUp(TEST_CONFIG_FILE_NAME, testExecutor);
+ final NodeControllerService[] ncs =
ExecutionTestUtil.integrationUtil.ncs;
+ Map<String, InetSocketAddress> ncEndPoints = new HashMap<>();
+ Map<String, InetSocketAddress> replicationAddress = new HashMap<>();
+ final String ip = InetAddress.getLoopbackAddress().getHostAddress();
+ for (NodeControllerService nc : ncs) {
+ final String nodeId = nc.getId();
+ final INcApplicationContext appCtx = (INcApplicationContext)
nc.getApplicationContext();
+ int apiPort = appCtx.getExternalProperties().getNcApiPort();
+ int replicationPort =
appCtx.getReplicationProperties().getDataReplicationPort(nodeId);
+ ncEndPoints.put(nodeId, InetSocketAddress.createUnresolved(ip,
apiPort));
+ replicationAddress.put(nodeId,
InetSocketAddress.createUnresolved(ip, replicationPort));
+ }
+ testExecutor.setNcEndPoints(ncEndPoints);
+ testExecutor.setNcReplicationAddress(replicationAddress);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ LangExecutionUtil.tearDown();
+ }
+
+ @Parameters(name = "ReplicationExecutionTest {index}: {0}")
+ public static Collection<Object[]> tests() throws Exception {
+ return LangExecutionUtil.tests("replication.xml", "replication.xml");
+ }
+
+ protected TestCaseContext tcCtx;
+
+ public ReplicationExecutionTest(TestCaseContext tcCtx) {
+ this.tcCtx = tcCtx;
+ }
+
+ @Test
+ public void test() throws Exception {
+ LangExecutionUtil.test(tcCtx);
+ }
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
new file mode 100644
index 0000000..7ddaa20
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.1.sto.cmd
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /addReplica 0 asterix_nc2
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
new file mode 100644
index 0000000..d287fad
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/replication/add_replica/add_replica.2.get.http
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+nc:asterix_nc1 /admin/storage/partition/0
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
new file mode 100644
index 0000000..a635676
--- /dev/null
+++ b/asterixdb/asterix-app/src/test/resources/runtimets/replication.xml
@@ -0,0 +1,28 @@
+<!--
+ ! Licensed to the Apache Software Foundation (ASF) under one
+ ! or more contributor license agreements. See the NOTICE file
+ ! distributed with this work for additional information
+ ! regarding copyright ownership. The ASF licenses this file
+ ! to you under the Apache License, Version 2.0 (the
+ ! "License"); you may not use this file except in compliance
+ ! with the License. You may obtain a copy of the License at
+ !
+ ! http://www.apache.org/licenses/LICENSE-2.0
+ !
+ ! Unless required by applicable law or agreed to in writing,
+ ! software distributed under the License is distributed on an
+ ! "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ! KIND, either express or implied. See the License for the
+ ! specific language governing permissions and limitations
+ ! under the License.
+ !-->
+<test-suite xmlns="urn:xml.testframework.asterix.apache.org"
ResultOffsetPath="results" QueryOffsetPath="queries_sqlpp"
+ QueryFileExtension=".sqlpp">
+ <test-group name="replication">
+ <test-case FilePath="replication">
+ <compilation-unit name="add_replica">
+ <output-dir compare="Text">add_replica</output-dir>
+ </compilation-unit>
+ </test-case>
+ </test-group>
+</test-suite>
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
new file mode 100644
index 0000000..3553d9c
--- /dev/null
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/results/replication/add_replica/add_replica.2.adm
@@ -0,0 +1,7 @@
+[ {
+ "partition" : 0,
+ "replicas" : [ {
+ "location" : "127.0.0.1:2017",
+ "status" : "DISCONNECTED"
+ } ]
+} ]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
index 0503c09..28be6fa 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/api/INcApplicationContext.java
@@ -29,8 +29,7 @@ import
org.apache.asterix.common.replication.IReplicaResourcesManager;
import org.apache.asterix.common.replication.IReplicationChannel;
import org.apache.asterix.common.replication.IReplicationManager;
import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.IStorageSubsystem;
-import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IReplicaManager;
import org.apache.asterix.common.transactions.ITransactionSubsystem;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.application.INCServiceContext;
@@ -119,7 +118,7 @@ public interface INcApplicationContext extends
IApplicationContext {
@Override
INCServiceContext getServiceContext();
- IStorageSubsystem getStorageSubsystem();
-
IIndexCheckpointManagerProvider getIndexCheckpointManagerProvider();
+
+ IReplicaManager getReplicaManager();
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
new file mode 100644
index 0000000..034d668
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ReplicationException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.exceptions;
+
+public class ReplicationException extends RuntimeException {
+
+ public ReplicationException(Throwable cause) {
+ super(cause);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
new file mode 100644
index 0000000..5a9dc3f
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/replication/IPartitionReplica.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.replication;
+
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+
+public interface IPartitionReplica {
+
+ enum PartitionReplicaStatus {
+ /* replica is in-sync with master */
+ IN_SYNC,
+ /* replica is still catching up with master */
+ CATCHING_UP,
+ /* replica is not connected with master */
+ DISCONNECTED
+ }
+
+ /**
+ * Gets the status of a replica.
+ *
+ * @return The status
+ */
+ PartitionReplicaStatus getStatus();
+
+ /**
+ * Gets the identifier of a replica
+ *
+ * @return The identifier
+ */
+ ReplicaIdentifier getIdentifier();
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
new file mode 100644
index 0000000..a3b2b50
--- /dev/null
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.common.storage;
+
+import java.util.List;
+import java.util.Set;
+
+import org.apache.asterix.common.replication.IPartitionReplica;
+
+public interface IReplicaManager {
+
+ /**
+ * Adds a replica with the specified {@code id}
+ *
+ * @param id
+ */
+ void addReplica(ReplicaIdentifier id);
+
+ /**
+ * Removes the replica with the specified {@code id}
+ *
+ * @param id
+ */
+ void removeReplica(ReplicaIdentifier id);
+
+ /**
+ * The existing replicas of the partition {@code partition}
+ *
+ * @param partition
+ * @return The list of replicas
+ */
+ List<IPartitionReplica> getReplicas(int partition);
+
+ /**
+ * Gets the list of partition to which the current node is
+ * the master of.
+ *
+ * @return The list of partition
+ */
+ Set<Integer> getPartitions();
+}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
deleted file mode 100644
index b4f06cb..0000000
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IStorageSubsystem.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.common.storage;
-
-import java.util.List;
-import java.util.Set;
-
-public interface IStorageSubsystem {
-
- /**
- * Adds a replica with the specified {@code id}
- *
- * @param id
- */
- void addReplica(ReplicaIdentifier id);
-
- /**
- * Removes the replica with the specified {@code id}
- *
- * @param id
- */
- void removeReplica(ReplicaIdentifier id);
-
- /**
- * The existing replicas of the partition {@code partition}
- *
- * @param partition
- * @return The list of replicas
- */
- List<PartitionReplica> getReplicas(int partition);
-
- /**
- * Gets the list of partition to which the current node is
- * the master of.
- *
- * @return The list of partition
- */
- Set<Integer> getPartitions();
-}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
index d5f23bf..1ac3ffa 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/Servlets.java
@@ -46,4 +46,8 @@ public class Servlets {
private Servlets() {
}
+
+ public static String getAbsolutePath(String servlet) {
+ return servlet.replaceAll("/\\*$", "");
+ }
}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-replication/pom.xml
----------------------------------------------------------------------
diff --git a/asterixdb/asterix-replication/pom.xml
b/asterixdb/asterix-replication/pom.xml
index f209aae..2b5fe0c 100644
--- a/asterixdb/asterix-replication/pom.xml
+++ b/asterixdb/asterix-replication/pom.xml
@@ -71,6 +71,14 @@
<artifactId>asterix-transactions</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/214d3735/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
----------------------------------------------------------------------
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
new file mode 100644
index 0000000..c6d1b60
--- /dev/null
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/storage/PartitionReplica.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.replication.storage;
+
+import static
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.CATCHING_UP;
+import static
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.DISCONNECTED;
+import static
org.apache.asterix.common.replication.IPartitionReplica.PartitionReplicaStatus.IN_SYNC;
+
+import java.nio.channels.SocketChannel;
+
+import org.apache.asterix.common.exceptions.ReplicationException;
+import org.apache.asterix.common.replication.IPartitionReplica;
+import org.apache.asterix.common.storage.ReplicaIdentifier;
+import org.apache.hyracks.util.JSONUtil;
+import org.apache.hyracks.util.annotations.ThreadSafe;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
+@ThreadSafe
+public class PartitionReplica implements IPartitionReplica {
+
+ private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ private final ReplicaIdentifier id;
+ private PartitionReplicaStatus status = DISCONNECTED;
+
+ public PartitionReplica(ReplicaIdentifier id) {
+ this.id = id;
+ }
+
+ @Override
+ public synchronized PartitionReplicaStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public ReplicaIdentifier getIdentifier() {
+ return id;
+ }
+
+ public synchronized void sync() {
+ if (status == IN_SYNC || status == CATCHING_UP) {
+ return;
+ }
+ }
+
+ public JsonNode asJson() {
+ ObjectNode json = OBJECT_MAPPER.createObjectNode();
+ json.put("id", id.toString());
+ json.put("state", status.name());
+ return json;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PartitionReplica that = (PartitionReplica) o;
+ return id.equals(that.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ try {
+ return JSONUtil.convertNode(asJson());
+ } catch (JsonProcessingException e) {
+ throw new ReplicationException(e);
+ }
+ }
+}
\ No newline at end of file