Repository: asterixdb
Updated Branches:
  refs/heads/master dd4cae8e5 -> cda3062f1


[NO ISSUE][CLUS] Add Metadata Cluster Partition

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
- Add a cluster partition reference to the cluster
  partition in which metadata is stored. This allows
  the initial metadata node to be removed from the
  cluster and another metadata node to be assigned
  to that metadata cluster partition. Initially,
  it is assigned to the first partition of the first
  metadata node.
- Use metadata cluster partition in defining metadata
  datasets file splits instead of the assumption of the
  first partition on the initial metadata node.

Change-Id: I2ac99252cacba92b4c4484c0d34cdc77fee307e8
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2348
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>


Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo
Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/cda3062f
Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/cda3062f
Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/cda3062f

Branch: refs/heads/master
Commit: cda3062f179c9c36efe40359adc2305002cc682d
Parents: dd4cae8
Author: Murtadha Hubail <mhub...@apache.org>
Authored: Sat Feb 3 08:17:21 2018 +0300
Committer: Murtadha Hubail <mhub...@apache.org>
Committed: Sat Feb 3 16:29:15 2018 -0800

----------------------------------------------------------------------
 .../runtime/ClusterStateManagerTest.java        |  1 +
 .../common/cluster/IClusterStateManager.java    | 14 ++++++++
 .../utils/SplitsAndConstraintsUtil.java         | 34 ++++++++++++--------
 .../runtime/utils/ClusterStateManager.java      | 26 ++++++++++-----
 4 files changed, 53 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cda3062f/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
index 4dd7463..4beb44a 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/runtime/ClusterStateManagerTest.java
@@ -231,6 +231,7 @@ public class ClusterStateManagerTest {
     private MetadataProperties mockMetadataProperties() {
         SortedMap<Integer, ClusterPartition> clusterPartitions = 
Collections.synchronizedSortedMap(new TreeMap<>());
         Map<String, ClusterPartition[]> nodePartitionsMap = new 
ConcurrentHashMap<>();
+        nodePartitionsMap.put(METADATA_NODE, new ClusterPartition[] { new 
ClusterPartition(0, METADATA_NODE, 0) });
         MetadataProperties metadataProperties = 
Mockito.mock(MetadataProperties.class);
         
Mockito.when(metadataProperties.getMetadataNodeName()).thenReturn(METADATA_NODE);
         
Mockito.when(metadataProperties.getClusterPartitions()).thenReturn(clusterPartitions);

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cda3062f/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
index 5b7e5a7..0a5707e 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/cluster/IClusterStateManager.java
@@ -224,4 +224,18 @@ public interface IClusterStateManager {
     boolean cancelRemovePending(String nodeId);
 
     Map<String, Map<IOption, Object>> getActiveNcConfiguration();
+
+    /**
+     * Sets the cluster partition in which metadata datasets stored
+     *
+     * @param partition
+     */
+    void setMetadataPartitionId(ClusterPartition partition);
+
+    /**
+     * Gets the cluster partition in which metadata datasets are stored
+     *
+     * @return The metadata cluster partitions
+     */
+    ClusterPartition getMetadataPartition();
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cda3062f/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
index 2a6d0e8..8ac69e4 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/SplitsAndConstraintsUtil.java
@@ -20,6 +20,8 @@ package org.apache.asterix.metadata.utils;
 
 import java.io.File;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import org.apache.asterix.common.cluster.ClusterPartition;
@@ -71,20 +73,11 @@ public class SplitsAndConstraintsUtil {
             String indexName, List<String> nodes) {
         final String relPath = 
StoragePathUtil.prepareDataverseIndexName(dataset.getDataverseName(),
                 dataset.getDatasetName(), indexName, 
dataset.getRebalanceCount());
-        List<FileSplit> splits = new ArrayList<>();
-        for (String nd : nodes) {
-            int numPartitions = clusterStateManager.getNodePartitionsCount(nd);
-            ClusterPartition[] nodePartitions = 
clusterStateManager.getNodePartitions(nd);
-            // currently this case is never executed since the metadata group 
doesn't exists
-            if 
(dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME)
 == 0) {
-                numPartitions = 1;
-            }
-
-            for (int k = 0; k < numPartitions; k++) {
-                File f = new 
File(StoragePathUtil.prepareStoragePartitionPath(nodePartitions[k].getPartitionId()),
-                        relPath);
-                
splits.add(StoragePathUtil.getFileSplitForClusterPartition(nodePartitions[k], 
f.getPath()));
-            }
+        final List<ClusterPartition> datasetPartitions = 
getDatasetPartitions(clusterStateManager, dataset, nodes);
+        final List<FileSplit> splits = new ArrayList<>();
+        for (ClusterPartition partition : datasetPartitions) {
+            File f = new 
File(StoragePathUtil.prepareStoragePartitionPath(partition.getPartitionId()), 
relPath);
+            
splits.add(StoragePathUtil.getFileSplitForClusterPartition(partition, 
f.getPath()));
         }
         return splits.toArray(new FileSplit[] {});
     }
@@ -94,4 +87,17 @@ public class SplitsAndConstraintsUtil {
         FileSplit[] splits = getDataverseSplits(clusterStateManager, 
dataverse);
         return StoragePathUtil.splitProviderAndPartitionConstraints(splits);
     }
+
+    private static List<ClusterPartition> 
getDatasetPartitions(IClusterStateManager clusterStateManager,
+            Dataset dataset, List<String> nodes) {
+        if 
(dataset.getNodeGroupName().compareTo(MetadataConstants.METADATA_NODEGROUP_NAME)
 == 0) {
+            return 
Collections.singletonList(clusterStateManager.getMetadataPartition());
+        }
+        final List<ClusterPartition> datasetPartitions = new ArrayList<>();
+        for (String node : nodes) {
+            final ClusterPartition[] nodePartitions = 
clusterStateManager.getNodePartitions(node);
+            datasetPartitions.addAll(Arrays.asList(nodePartitions));
+        }
+        return datasetPartitions;
+    }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/cda3062f/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
index c5eeb65..76668d2 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/utils/ClusterStateManager.java
@@ -32,10 +32,10 @@ import java.util.concurrent.TimeUnit;
 import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
 import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.cluster.IClusterStateManager;
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.common.replication.INcLifecycleCoordinator;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.transactions.IResourceIdManager;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -73,6 +73,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
     private Set<String> participantNodes = new HashSet<>();
     private INcLifecycleCoordinator lifecycleCoordinator;
     private ICcApplicationContext appCtx;
+    private ClusterPartition metadataPartition;
 
     @Override
     public void setCcAppCtx(ICcApplicationContext appCtx) {
@@ -80,6 +81,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
         node2PartitionsMap = 
appCtx.getMetadataProperties().getNodePartitions();
         clusterPartitions = 
appCtx.getMetadataProperties().getClusterPartitions();
         currentMetadataNode = 
appCtx.getMetadataProperties().getMetadataNodeName();
+        metadataPartition = node2PartitionsMap.get(currentMetadataNode)[0];
         lifecycleCoordinator = appCtx.getNcLifecycleCoordinator();
         lifecycleCoordinator.bindTo(this);
     }
@@ -121,16 +123,18 @@ public class ClusterStateManager implements 
IClusterStateManager {
     }
 
     @Override
-    public void updateMetadataNode(String nodeId, boolean active) {
+    public synchronized void updateMetadataNode(String nodeId, boolean active) 
{
         currentMetadataNode = nodeId;
         metadataNodeActive = active;
         if (active) {
+            metadataPartition.setActiveNodeId(currentMetadataNode);
             LOGGER.info(String.format("Metadata node %s is now active", 
currentMetadataNode));
         }
+        notifyAll();
     }
 
     @Override
-    public synchronized void updateNodePartitions(String nodeId, boolean 
active) throws HyracksDataException {
+    public synchronized void updateNodePartitions(String nodeId, boolean 
active) {
         if (active) {
             participantNodes.add(nodeId);
         } else {
@@ -306,11 +310,7 @@ public class ClusterStateManager implements 
IClusterStateManager {
 
     @Override
     public synchronized ClusterPartition[] getClusterPartitons() {
-        ArrayList<ClusterPartition> partitons = new ArrayList<>();
-        for (ClusterPartition partition : clusterPartitions.values()) {
-            partitons.add(partition);
-        }
-        return partitons.toArray(new ClusterPartition[] {});
+        return clusterPartitions.values().toArray(new ClusterPartition[] {});
     }
 
     @Override
@@ -442,6 +442,16 @@ public class ClusterStateManager implements 
IClusterStateManager {
         return new HashSet<>(pendingRemoval);
     }
 
+    @Override
+    public synchronized void setMetadataPartitionId(ClusterPartition 
partition) {
+        metadataPartition = partition;
+    }
+
+    @Override
+    public synchronized ClusterPartition getMetadataPartition() {
+        return metadataPartition;
+    }
+
     private void updateNodeConfig(String nodeId, Map<IOption, Object> 
configuration) {
         ConfigManager configManager =
                 ((ConfigManagerApplicationConfig) 
appCtx.getServiceContext().getAppConfig()).getConfigManager();

Reply via email to