Repository: asterixdb Updated Branches: refs/heads/master ca8d0830e -> 15924f3cc
[NO ISSUE] Add support for NodeGroup upsert, etc. Also, enable resource id generation when only pending removal nodes are absent from the cluster Change-Id: I15cfb74bc345680102cedafa99f7ff4f144860bc Reviewed-on: https://asterix-gerrit.ics.uci.edu/2389 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Murtadha Hubail <mhub...@apache.org> Integration-Tests: Murtadha Hubail <mhub...@apache.org> Tested-by: Murtadha Hubail <mhub...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/15924f3c Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/15924f3c Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/15924f3c Branch: refs/heads/master Commit: 15924f3cc5735a8d88ab1b9da1384691cc48a95b Parents: ca8d083 Author: Michael Blow <mb...@apache.org> Authored: Wed Feb 14 20:34:10 2018 -0500 Committer: Michael Blow <mb...@apache.org> Committed: Wed Feb 14 18:36:11 2018 -0800 ---------------------------------------------------------------------- .../api/http/server/ClusterApiServlet.java | 2 +- .../common/cluster/IClusterStateManager.java | 2 +- .../apache/asterix/metadata/MetadataCache.java | 8 ++------ .../apache/asterix/metadata/MetadataManager.java | 13 ++++++++++++- .../org/apache/asterix/metadata/MetadataNode.java | 5 +++-- .../metadata/MetadataTransactionContext.java | 2 +- .../asterix/metadata/api/IMetadataManager.java | 15 +++++++++++++-- .../asterix/metadata/api/IMetadataNode.java | 6 +++++- .../asterix/metadata/entities/NodeGroup.java | 2 +- .../runtime/message/ResourceIdRequestMessage.java | 18 +++++++++--------- .../runtime/transaction/ResourceIdManager.java | 3 ++- .../runtime/utils/ClusterStateManager.java | 3 ++- .../hyracks/control/cc/cluster/NodeManager.java | 4 ++-- 13 files changed, 54 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java index d239038..a3ad089 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/http/server/ClusterApiServlet.java @@ -163,7 +163,7 @@ public class ClusterApiServlet extends AbstractServlet { private void processPartitionMaster(IServletRequest request, IServletResponse response) { final String partition = request.getParameter("partition"); final String node = request.getParameter("node"); - appCtx.getClusterStateManager().updateClusterPartition(Integer.valueOf(partition), node, true); + appCtx.getClusterStateManager().updateClusterPartition(Integer.parseInt(partition), node, true); response.setStatus(HttpResponseStatus.OK); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java index 0a5707e..dda9ffd 100644 --- a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java +++ b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java @@ -66,7 +66,7 @@ public interface IClusterStateManager { /** * Updates the active node and active state of the cluster partition with id {@code partitionNum} */ - void updateClusterPartition(Integer partitionNum, String activeNode, boolean active); + void updateClusterPartition(int partitionNum, String activeNode, boolean active); /** * Updates the metadata node id and its state. http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java index a12b079..b994c50 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataCache.java @@ -21,7 +21,6 @@ package org.apache.asterix.metadata; import java.util.ArrayList; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; @@ -191,12 +190,9 @@ public class MetadataCache { } } - public NodeGroup addNodeGroupIfNotExists(NodeGroup nodeGroup) { + public NodeGroup addOrUpdateNodeGroup(NodeGroup nodeGroup) { synchronized (nodeGroups) { - if (!nodeGroups.containsKey(nodeGroup.getNodeGroupName())) { - return nodeGroups.put(nodeGroup.getNodeGroupName(), nodeGroup); - } - return null; + return nodeGroups.put(nodeGroup.getNodeGroupName(), nodeGroup); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java index b2757f2..a5e3a84 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataManager.java @@ -57,6 +57,7 @@ import org.apache.asterix.metadata.entities.Library; import org.apache.asterix.metadata.entities.Node; import org.apache.asterix.metadata.entities.NodeGroup; import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback.Operation; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -480,8 +481,18 @@ public abstract class MetadataManager implements IMetadataManager { @Override public void addNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException { + modifyNodegroup(ctx, nodeGroup, Operation.INSERT); + } + + @Override + public void upsertNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException { + modifyNodegroup(ctx, nodeGroup, Operation.UPSERT); + } + + public void modifyNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup, Operation op) + throws AlgebricksException { try { - metadataNode.addNodeGroup(ctx.getTxnId(), nodeGroup); + metadataNode.modifyNodeGroup(ctx.getTxnId(), nodeGroup, op); } catch (RemoteException e) { throw new MetadataException(ErrorCode.REMOTE_EXCEPTION_WHEN_CALLING_METADATA_NODE, e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java index 966c99a..64d0389 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataNode.java @@ -404,11 +404,12 @@ public class MetadataNode implements IMetadataNode { } @Override - public void addNodeGroup(TxnId txnId, NodeGroup nodeGroup) throws AlgebricksException, RemoteException { + public void modifyNodeGroup(TxnId txnId, NodeGroup nodeGroup, Operation modificationOp) + throws AlgebricksException, RemoteException { try { NodeGroupTupleTranslator tupleReaderWriter = tupleTranslatorProvider.getNodeGroupTupleTranslator(true); ITupleReference tuple = tupleReaderWriter.getTupleFromMetadataEntity(nodeGroup); - insertTupleIntoIndex(txnId, MetadataPrimaryIndexes.NODEGROUP_DATASET, tuple); + modifyMetadataIndex(modificationOp, txnId, MetadataPrimaryIndexes.NODEGROUP_DATASET, tuple); } catch (HyracksDataException e) { if (e.getComponent().equals(ErrorCode.HYRACKS) && e.getErrorCode() == ErrorCode.DUPLICATE_KEY) { throw new AlgebricksException( http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java index cb67ee5..961a1ee 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/MetadataTransactionContext.java @@ -152,7 +152,7 @@ public class MetadataTransactionContext extends MetadataCache { public void dropNodeGroup(String nodeGroupName) { NodeGroup nodeGroup = new NodeGroup(nodeGroupName, null); - droppedCache.addNodeGroupIfNotExists(nodeGroup); + droppedCache.addOrUpdateNodeGroup(nodeGroup); logAndApply(new MetadataLogicalOperation(nodeGroup, false)); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java index e030db3..ff349e1 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataManager.java @@ -320,18 +320,29 @@ public interface IMetadataManager extends IMetadataBootstrap { throws AlgebricksException; /** - * Inserts a node group. + * Inserts a new node group. * * @param ctx * MetadataTransactionContext of an active metadata transaction. * @param nodeGroup * Node group instance to insert. * @throws AlgebricksException - * For example, if the node group already exists. + * For example, if the node group already exists */ void addNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException; /** + * Inserts a new (or updates an existing) node group. + * + * @param ctx + * MetadataTransactionContext of an active metadata transaction. + * @param nodeGroup + * Node group instance to insert or update. + * @throws AlgebricksException + */ + void upsertNodegroup(MetadataTransactionContext ctx, NodeGroup nodeGroup) throws AlgebricksException; + + /** * Retrieves a node group. * * @param ctx http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java index f6abc53..dc23db4 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/api/IMetadataNode.java @@ -41,6 +41,7 @@ import org.apache.asterix.metadata.entities.Index; import org.apache.asterix.metadata.entities.Library; import org.apache.asterix.metadata.entities.Node; import org.apache.asterix.metadata.entities.NodeGroup; +import org.apache.asterix.transaction.management.opcallbacks.AbstractIndexModificationOperationCallback; import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; /** @@ -340,11 +341,14 @@ public interface IMetadataNode extends Remote, Serializable { * A globally unique id for an active metadata transaction. * @param nodeGroup * Node group instance to insert. + * @param modificationOp * @throws AlgebricksException * For example, if the node group already exists. * @throws RemoteException */ - void addNodeGroup(TxnId txnId, NodeGroup nodeGroup) throws AlgebricksException, RemoteException; + void modifyNodeGroup(TxnId txnId, NodeGroup nodeGroup, + AbstractIndexModificationOperationCallback.Operation modificationOp) + throws AlgebricksException, RemoteException; /** * Retrieves a node group, acquiring local locks on behalf of the given http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java index e5088aa..8a7d5ec 100644 --- a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java +++ b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/entities/NodeGroup.java @@ -50,7 +50,7 @@ public class NodeGroup implements IMetadataEntity<NodeGroup> { @Override public NodeGroup addToCache(MetadataCache cache) { - return cache.addNodeGroupIfNotExists(this); + return cache.addOrUpdateNodeGroup(this); } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java index 3e172c7..087913f 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/message/ResourceIdRequestMessage.java @@ -41,16 +41,16 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage { ICCMessageBroker broker = (ICCMessageBroker) appCtx.getServiceContext().getMessageBroker(); ResourceIdRequestResponseMessage response = new ResourceIdRequestResponseMessage(); IClusterStateManager clusterStateManager = appCtx.getClusterStateManager(); - if (!clusterStateManager.isClusterActive()) { - response.setResourceId(-1); - response.setException(new Exception("Cannot generate global resource id when cluster is not active.")); - } else { - IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); - response.setResourceId(resourceIdManager.createResourceId()); - if (response.getResourceId() < 0) { + IResourceIdManager resourceIdManager = appCtx.getResourceIdManager(); + response.setResourceId(resourceIdManager.createResourceId()); + if (response.getResourceId() < 0) { + if (!(clusterStateManager.isClusterActive())) { + response.setException( + new Exception("Cannot generate global resource id when cluster is not active.")); + } else { response.setException(new Exception("One or more nodes has not reported max resource id.")); + requestMaxResourceID(clusterStateManager, resourceIdManager, broker); } - requestMaxResourceID(clusterStateManager, resourceIdManager, broker); } broker.sendApplicationMessageToNC(response, src); } catch (Exception e) { @@ -60,7 +60,7 @@ public class ResourceIdRequestMessage implements ICcAddressedMessage { private void requestMaxResourceID(IClusterStateManager clusterStateManager, IResourceIdManager resourceIdManager, ICCMessageBroker broker) throws Exception { - Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(); + Set<String> getParticipantNodes = clusterStateManager.getParticipantNodes(true); ReportLocalCountersRequestMessage msg = new ReportLocalCountersRequestMessage(); for (String nodeId : getParticipantNodes) { if (!resourceIdManager.reported(nodeId)) { http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java index 0bb862d..6d3077e 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/transaction/ResourceIdManager.java @@ -38,7 +38,8 @@ public class ResourceIdManager implements IResourceIdManager { @Override public long createResourceId() { - return csm.isClusterActive() ? globalResourceId.incrementAndGet() : -1; + return csm.isClusterActive() || reportedNodes.containsAll(csm.getParticipantNodes(true)) + ? globalResourceId.incrementAndGet() : -1; } @Override http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java index 76668d2..03a6868 100644 --- a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java +++ b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java @@ -150,7 +150,7 @@ public class ClusterStateManager implements IClusterStateManager { } @Override - public synchronized void updateClusterPartition(Integer partitionNum, String activeNode, boolean active) { + public synchronized void updateClusterPartition(int partitionNum, String activeNode, boolean active) { ClusterPartition clusterPartition = clusterPartitions.get(partitionNum); if (clusterPartition != null) { // set the active node for this node's partitions @@ -159,6 +159,7 @@ public class ClusterStateManager implements IClusterStateManager { clusterPartition.setActiveNodeId(activeNode); clusterPartition.setPendingActivation(false); } + notifyAll(); } } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/15924f3c/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java ---------------------------------------------------------------------- diff --git a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java index 8f73864..77e7bf7 100644 --- a/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java +++ b/hyracks-fullstack/hyracks/hyracks-control/hyracks-control-cc/src/main/java/org/apache/hyracks/control/cc/cluster/NodeManager.java @@ -101,7 +101,7 @@ public class NodeManager implements INodeManager { // Updates the node registry. if (nodeRegistry.containsKey(nodeId)) { LOGGER.warn("Node with name " + nodeId + " has already registered; failing the node then re-registering."); - failNonDeadNode(nodeId); + failNode(nodeId); } else { try { // TODO(mblow): it seems we should close IPC handles when we're done with them (like here) @@ -173,7 +173,7 @@ public class NodeManager implements INodeManager { return Pair.of(deadNodes, affectedJobIds); } - private void failNonDeadNode(String nodeId) throws HyracksException { + public void failNode(String nodeId) throws HyracksException { NodeControllerState state = nodeRegistry.get(nodeId); Set<JobId> affectedJobIds = state.getActiveJobIds(); // Removes the node from node map.