http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
index 98dabc9..ea32e94 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java
@@ -40,10 +40,12 @@ import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.hyracks.bootstrap.CCApplication;
 import org.apache.asterix.hyracks.bootstrap.NCApplication;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.application.ICCApplication;
 import org.apache.hyracks.api.application.INCApplication;
 import org.apache.hyracks.api.client.HyracksConnection;
 import org.apache.hyracks.api.client.IHyracksClientConnection;
+import org.apache.hyracks.api.config.IOption;
 import org.apache.hyracks.control.cc.ClusterControllerService;
 import org.apache.hyracks.control.common.config.ConfigManager;
 import org.apache.hyracks.control.common.controllers.CCConfig;
@@ -52,30 +54,55 @@ import 
org.apache.hyracks.control.common.controllers.NCConfig;
 import org.apache.hyracks.control.nc.NodeControllerService;
 import org.kohsuke.args4j.CmdLineException;
 
+@SuppressWarnings({"squid:ClassVariableVisibilityCheck","squid:S00112"})
 public class AsterixHyracksIntegrationUtil {
-    static class LoggerHolder {
-        static final Logger LOGGER = 
Logger.getLogger(AsterixHyracksIntegrationUtil.class.getName());
-
-        private LoggerHolder() {
-        }
-    }
-
     public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098;
     public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099;
-
+    public static final String DEFAULT_CONF_FILE = joinPath("asterixdb", 
"asterix-app", "src", "test", "resources",
+            "cc.conf");
+    private static final String DEFAULT_STORAGE_PATH = joinPath("target", 
"io", "dir");
+    private static String storagePath = DEFAULT_STORAGE_PATH;
     public ClusterControllerService cc;
-    public NodeControllerService[] ncs = new NodeControllerService[0];
+    public NodeControllerService[] ncs = new NodeControllerService[2];
     public IHyracksClientConnection hcc;
     protected boolean gracefulShutdown = true;
-
-    private static final String DEFAULT_STORAGE_PATH = joinPath("target", 
"io", "dir");
-    private static String storagePath = DEFAULT_STORAGE_PATH;
+    List<Pair<IOption, Object>> opts = new ArrayList<>();
     private ConfigManager configManager;
     private List<String> nodeNames;
 
-    public void init(boolean deleteOldInstanceData) throws Exception {
+    public static void setStoragePath(String path) {
+        storagePath = path;
+    }
+
+    public static void restoreDefaultStoragePath() {
+        storagePath = DEFAULT_STORAGE_PATH;
+    }
+
+    /**
+     * main method to run a simple 2 node cluster in-process
+     * suggested VM arguments: <code>-enableassertions -Xmx2048m 
-Dfile.encoding=UTF-8</code>
+     *
+     * @param args
+     *            unused
+     */
+    public static void main(String[] args) throws Exception {
+        AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
+        try {
+            integrationUtil.run(Boolean.getBoolean("cleanup.start"), 
Boolean.getBoolean("cleanup.shutdown"),
+                    System.getProperty("external.lib", ""), 
System.getProperty("conf.path", DEFAULT_CONF_FILE));
+        } catch (Exception e) {
+            LOGGER.log(Level.WARNING, "Unexpected exception", e);
+            System.exit(1);
+        }
+    }
+
+    public void init(boolean deleteOldInstanceData, String confFile) throws 
Exception { //NOSONAR
         final ICCApplication ccApplication = createCCApplication();
-        configManager = new ConfigManager();
+        if (confFile == null) {
+            configManager = new ConfigManager();
+        } else {
+            configManager = new ConfigManager(new String[] { "-config-file", 
confFile });
+        }
         ccApplication.registerConfig(configManager);
         final CCConfig ccConfig = createCCConfig(configManager);
         cc = new ClusterControllerService(ccConfig, ccApplication);
@@ -90,12 +117,18 @@ public class AsterixHyracksIntegrationUtil {
             // mark this NC as virtual in the CC's config manager, so he 
doesn't try to contact NCService...
             configManager.set(nodeId, NCConfig.Option.NCSERVICE_PORT, 
NCConfig.NCSERVICE_PORT_DISABLED);
             final INCApplication ncApplication = createNCApplication();
-            ConfigManager ncConfigManager = new ConfigManager();
+            ConfigManager ncConfigManager;
+            if (confFile == null) {
+                ncConfigManager = new ConfigManager();
+            } else {
+                ncConfigManager = new ConfigManager(new String[] { 
"-config-file", confFile });
+            }
             ncApplication.registerConfig(ncConfigManager);
             nodeControllers.add(
                     new 
NodeControllerService(fixupIODevices(createNCConfig(nodeId, ncConfigManager)), 
ncApplication));
-        }
+        } ;
 
+        opts.stream().forEach(opt -> configManager.set(opt.getLeft(), 
opt.getRight()));
         cc.start();
 
         // Starts ncs.
@@ -125,11 +158,11 @@ public class AsterixHyracksIntegrationUtil {
         this.ncs = nodeControllers.toArray(new 
NodeControllerService[nodeControllers.size()]);
     }
 
-    public void init(boolean deleteOldInstanceData, String externalLibPath) 
throws Exception {
+    public void init(boolean deleteOldInstanceData, String externalLibPath, 
String confDir) throws Exception {
         List<ILibraryManager> libraryManagers = new ArrayList<>();
         ExternalUDFLibrarian librarian = new 
ExternalUDFLibrarian(libraryManagers);
         librarian.cleanup();
-        init(deleteOldInstanceData);
+        init(deleteOldInstanceData, confDir);
         if (externalLibPath != null && externalLibPath.length() != 0) {
             libraryManagers.add(((ICcApplicationContext) 
cc.getApplicationContext()).getLibraryManager());
             for (NodeControllerService nc : ncs) {
@@ -246,18 +279,10 @@ public class AsterixHyracksIntegrationUtil {
         }
     }
 
-    public static void setStoragePath(String path) {
-        storagePath = path;
-    }
-
     public void setGracefulShutdown(boolean gracefulShutdown) {
         this.gracefulShutdown = gracefulShutdown;
     }
 
-    public static void restoreDefaultStoragePath() {
-        storagePath = DEFAULT_STORAGE_PATH;
-    }
-
     protected String getDefaultStoragePath() {
         return storagePath;
     }
@@ -280,21 +305,22 @@ public class AsterixHyracksIntegrationUtil {
         }
     }
 
-    /**
-     * main method to run a simple 2 node cluster in-process
-     * suggested VM arguments: <code>-enableassertions -Xmx2048m 
-Dfile.encoding=UTF-8</code>
-     *
-     * @param args
-     *            unused
-     */
-    public static void main(String[] args) throws Exception {
-        AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
-        try {
-            integrationUtil.run(Boolean.getBoolean("cleanup.start"), 
Boolean.getBoolean("cleanup.shutdown"),
-                    System.getProperty("external.lib", ""));
-        } catch (Exception e) {
-            LOGGER.log(Level.WARNING, "Unexpected exception", e);
-            System.exit(1);
+    protected void run(boolean cleanupOnStart, boolean cleanupOnShutdown, 
String loadExternalLibs, String confFile)
+            throws Exception {
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                try {
+                    deinit(cleanupOnShutdown);
+                } catch (Exception e) {
+                    LOGGER.log(Level.WARNING, "Unexpected exception on 
shutdown", e);
+                }
+            }
+        });
+
+        init(cleanupOnStart, loadExternalLibs, confFile);
+        while (true) {
+            Thread.sleep(10000);
         }
     }
 
@@ -309,7 +335,6 @@ public class AsterixHyracksIntegrationUtil {
                 }
             }
         });
-        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, 
"asterix-build-configuration.xml");
 
         init(cleanupOnStart, loadExternalLibs);
         while (true) {
@@ -317,6 +342,17 @@ public class AsterixHyracksIntegrationUtil {
         }
     }
 
+    public void addOption(IOption name, Object value) {
+        opts.add(Pair.of(name, value));
+    }
+
+    static class LoggerHolder {
+        static final Logger LOGGER = 
Logger.getLogger(AsterixHyracksIntegrationUtil.class.getName());
+
+        private LoggerHolder() {
+        }
+    }
+
     private class UngracefulShutdownNCApplication extends NCApplication {
         @Override
         public void stop() throws Exception {
@@ -324,4 +360,5 @@ public class AsterixHyracksIntegrationUtil {
             webManager.stop();
         }
     }
+
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
index b1d2159..75207e8 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java
@@ -19,18 +19,25 @@
 package org.apache.asterix.app.external;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.rmi.RemoteException;
+import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.List;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipFile;
 
 import org.apache.asterix.common.exceptions.ACIDException;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.library.ILibraryManager;
-import org.apache.asterix.event.service.AsterixEventServiceUtil;
+import org.apache.commons.compress.utils.IOUtils;
 import org.apache.commons.io.FileUtils;
 import org.apache.hyracks.algebricks.common.utils.Pair;
 
+@SuppressWarnings("squid:S134")
 public class ExternalUDFLibrarian implements IExternalUDFLibrarian {
 
     // The following list includes a library manager for the CC
@@ -41,6 +48,38 @@ public class ExternalUDFLibrarian implements 
IExternalUDFLibrarian {
         this.libraryManagers = libraryManagers;
     }
 
+    public static void removeLibraryDir() throws IOException {
+        File installLibDir = ExternalLibraryUtils.getLibraryInstallDir();
+        FileUtils.deleteQuietly(installLibDir);
+    }
+
+    public static void unzip(String sourceFile, String outputDir) throws 
IOException {
+        if (System.getProperty("os.name").toLowerCase().startsWith("win")) {
+            try (ZipFile zipFile = new ZipFile(sourceFile)) {
+                Enumeration<? extends ZipEntry> entries = zipFile.entries();
+                while (entries.hasMoreElements()) {
+                    ZipEntry entry = entries.nextElement();
+                    File entryDestination = new File(outputDir, 
entry.getName());
+                    if (!entry.isDirectory()) {
+                        entryDestination.getParentFile().mkdirs();
+                        try (InputStream in = zipFile.getInputStream(entry);
+                                OutputStream out = new 
FileOutputStream(entryDestination)) {
+                            IOUtils.copy(in, out);
+                        }
+                    }
+                }
+            }
+        } else {
+            Process process = new ProcessBuilder("unzip", "-d", outputDir, 
sourceFile).start();
+            try {
+                process.waitFor();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                throw new IOException(e);
+            }
+        }
+    }
+
     @Override
     public void install(String dvName, String libName, String libPath) throws 
Exception {
         // get the directory of the to be installed libraries
@@ -55,7 +94,7 @@ public class ExternalUDFLibrarian implements 
IExternalUDFLibrarian {
         FileUtils.deleteQuietly(destinationDir);
         destinationDir.mkdirs();
         try {
-            AsterixEventServiceUtil.unzip(libPath, 
destinationDir.getAbsolutePath());
+            unzip(libPath, destinationDir.getAbsolutePath());
         } catch (Exception e) {
 
             throw new Exception("Couldn't unzip the file: " + libPath, e);
@@ -78,11 +117,6 @@ public class ExternalUDFLibrarian implements 
IExternalUDFLibrarian {
         }
     }
 
-    public static void removeLibraryDir() throws IOException {
-        File installLibDir = ExternalLibraryUtils.getLibraryInstallDir();
-        FileUtils.deleteQuietly(installLibDir);
-    }
-
     public void cleanup() throws AsterixException, RemoteException, 
ACIDException {
         for (ILibraryManager libraryManager : libraryManagers) {
             List<Pair<String, String>> libs = libraryManager.getAllLibraries();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
index 81b232a..df3ca64 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java
@@ -41,7 +41,6 @@ import org.apache.asterix.common.config.ActiveProperties;
 import org.apache.asterix.common.config.AsterixExtension;
 import org.apache.asterix.common.config.BuildProperties;
 import org.apache.asterix.common.config.CompilerProperties;
-import org.apache.asterix.common.config.ExtensionProperties;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.MessagingProperties;
 import org.apache.asterix.common.config.MetadataProperties;
@@ -60,6 +59,7 @@ import 
org.apache.asterix.common.replication.IRemoteRecoveryManager;
 import org.apache.asterix.common.replication.IReplicaResourcesManager;
 import org.apache.asterix.common.replication.IReplicationChannel;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
@@ -165,7 +165,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         if (extensions != null) {
             allExtensions.addAll(extensions);
         }
-        allExtensions.addAll(new 
ExtensionProperties(propertiesAccessor).getExtensions());
+        allExtensions.addAll(propertiesAccessor.getExtensions());
         ncExtensionManager = new NCExtensionManager(allExtensions);
         componentProvider = new StorageComponentProvider();
         resourceIdFactory = new 
GlobalResourceIdFactoryProvider(ncServiceContext).createResourceIdFactory();
@@ -189,6 +189,7 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
         ILocalResourceRepositoryFactory 
persistentLocalResourceRepositoryFactory =
                 new PersistentLocalResourceRepositoryFactory(ioManager, 
getServiceContext().getNodeId(),
                         metadataProperties, indexCheckpointManagerProvider);
+
         localResourceRepository =
                 (PersistentLocalResourceRepository) 
persistentLocalResourceRepositoryFactory.createRepository();
 
@@ -219,43 +220,48 @@ public class NCAppRuntimeContext implements 
INcApplicationContext {
                 activeProperties.getMemoryComponentGlobalBudget(), 
compilerProperties.getFrameSize(),
                 this.ncServiceContext);
 
-        if 
(replicationProperties.isParticipant(getServiceContext().getNodeId())) {
+        if (replicationProperties.isReplicationEnabled()) {
 
             replicaResourcesManager = new 
ReplicaResourcesManager(localResourceRepository, metadataProperties,
                     indexCheckpointManagerProvider);
 
             replicationManager = new ReplicationManager(nodeId, 
replicationProperties, replicaResourcesManager,
-                    txnSubsystem.getLogManager(), 
asterixAppRuntimeContextProvider);
-
-            //pass replication manager to replication required object
-            //LogManager to replicate logs
-            
txnSubsystem.getLogManager().setReplicationManager(replicationManager);
-
-            //PersistentLocalResourceRepository to replicated metadata files 
and delete backups on drop index
-            localResourceRepository.setReplicationManager(replicationManager);
-
-            /*
-             * add the partitions that will be replicated in this node as 
inactive partitions
-             */
-            //get nodes which replicated to this node
-            Set<String> remotePrimaryReplicas = 
replicationProperties.getRemotePrimaryReplicasIds(nodeId);
-            for (String clientId : remotePrimaryReplicas) {
-                //get the partitions of each client
-                ClusterPartition[] clientPartitions = 
metadataProperties.getNodePartitions().get(clientId);
-                for (ClusterPartition partition : clientPartitions) {
-                    
localResourceRepository.addInactivePartition(partition.getPartitionId());
+                    txnSubsystem.getLogManager(), 
asterixAppRuntimeContextProvider, ncServiceContext);
+
+            if 
(replicationManager.getReplicationStrategy().isParticipant(getServiceContext().getNodeId()))
 {
+
+                //pass replication manager to replication required object
+                //LogManager to replicate logs
+                
txnSubsystem.getLogManager().setReplicationManager(replicationManager);
+
+                //PersistentLocalResourceRepository to replicate metadata 
files and delete backups on drop index
+                
localResourceRepository.setReplicationManager(replicationManager);
+
+                /*
+                 * add the partitions that will be replicated in this node as 
inactive partitions
+                 */
+                //get nodes which replicate to this node
+                Set<String> remotePrimaryReplicas = 
replicationManager.getReplicationStrategy()
+                        
.getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet());
+                for (String clientId : remotePrimaryReplicas) {
+                    //get the partitions of each client
+                    ClusterPartition[] clientPartitions = 
metadataProperties.getNodePartitions().get(clientId);
+                    for (ClusterPartition partition : clientPartitions) {
+                        
localResourceRepository.addInactivePartition(partition.getPartitionId());
+                    }
                 }
-            }
 
-            //initialize replication channel
-            replicationChannel = new ReplicationChannel(nodeId, 
replicationProperties, txnSubsystem.getLogManager(),
-                    replicaResourcesManager, replicationManager, 
getServiceContext(), asterixAppRuntimeContextProvider);
+                //initialize replication channel
+                replicationChannel = new ReplicationChannel(nodeId, 
replicationProperties, txnSubsystem.getLogManager(),
+                        replicaResourcesManager, replicationManager, 
getServiceContext(),
+                        asterixAppRuntimeContextProvider, 
replicationManager.getReplicationStrategy());
 
-            remoteRecoveryManager = new 
RemoteRecoveryManager(replicationManager, this, replicationProperties);
+                remoteRecoveryManager = new 
RemoteRecoveryManager(replicationManager, this, replicationProperties);
 
-            bufferCache = new BufferCache(ioManager, prs, pcp, new 
FileMapManager(),
-                    storageProperties.getBufferCacheMaxOpenFiles(), 
getServiceContext().getThreadFactory(),
-                    replicationManager);
+                bufferCache = new BufferCache(ioManager, prs, pcp, new 
FileMapManager(),
+                        storageProperties.getBufferCacheMaxOpenFiles(), 
getServiceContext().getThreadFactory(),
+                        replicationManager);
+            }
         } else {
             bufferCache = new BufferCache(ioManager, prs, pcp, new 
FileMapManager(),
                     storageProperties.getBufferCacheMaxOpenFiles(), 
getServiceContext().getThreadFactory());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
index 22fa459..dc22342 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java
@@ -101,9 +101,9 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
         this.serviceCtx = serviceCtx;
         this.txnSubsystem = txnSubsystem;
         logMgr = (LogManager) txnSubsystem.getLogManager();
-        ReplicationProperties repProperties =
-                
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationProperties();
-        replicationEnabled = repProperties.isParticipant(txnSubsystem.getId());
+        ReplicationProperties repProperties = 
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()
+                .getReplicationProperties();
+        replicationEnabled = repProperties.isReplicationEnabled();
         localResourceRepository = (PersistentLocalResourceRepository) 
txnSubsystem.getAsterixAppRuntimeContextProvider()
                 .getLocalResourceRepository();
         cachedEntityCommitsPerJobSize = 
txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize();
@@ -426,16 +426,16 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
 
     @Override
     public long getLocalMinFirstLSN() throws HyracksDataException {
-        IDatasetLifecycleManager datasetLifecycleManager =
-                
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
+        IDatasetLifecycleManager datasetLifecycleManager = 
txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getDatasetLifecycleManager();
         List<IIndex> openIndexList = 
datasetLifecycleManager.getOpenResources();
         long firstLSN;
         //the min first lsn can only be the current append or smaller
         long minFirstLSN = logMgr.getAppendLSN();
         if (!openIndexList.isEmpty()) {
             for (IIndex index : openIndexList) {
-                AbstractLSMIOOperationCallback ioCallback =
-                        (AbstractLSMIOOperationCallback) ((ILSMIndex) 
index).getIOOperationCallback();
+                AbstractLSMIOOperationCallback ioCallback = 
(AbstractLSMIOOperationCallback) ((ILSMIndex) index)
+                        .getIOOperationCallback();
                 if (!((AbstractLSMIndex) 
index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) {
                     firstLSN = ioCallback.getFirstLSN();
                     minFirstLSN = Math.min(minFirstLSN, firstLSN);
@@ -446,8 +446,8 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
     }
 
     private long getRemoteMinFirstLSN() throws HyracksDataException {
-        IReplicaResourcesManager remoteResourcesManager =
-                
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
+        IReplicaResourcesManager remoteResourcesManager = 
txnSubsystem.getAsterixAppRuntimeContextProvider()
+                .getAppContext().getReplicaResourcesManager();
         return 
remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions());
     }
 
@@ -566,9 +566,9 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
                         if 
(activePartitions.contains(logRecord.getResourcePartition())) {
                             undoLSNSet = 
jobLoserEntity2LSNsMap.get(tempKeyTxnEntityId);
                             if (undoLSNSet == null) {
-                                loserEntity =
-                                        new TxnEntityId(logTxnId, 
logRecord.getDatasetId(), logRecord.getPKHashValue(),
-                                                logRecord.getPKValue(), 
logRecord.getPKValueSize(), true);
+                                loserEntity = new TxnEntityId(logTxnId, 
logRecord.getDatasetId(),
+                                        logRecord.getPKHashValue(), 
logRecord.getPKValue(), logRecord.getPKValueSize(),
+                                        true);
                                 undoLSNSet = new LinkedList<>();
                                 jobLoserEntity2LSNsMap.put(loserEntity, 
undoLSNSet);
                             }
@@ -611,8 +611,8 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
             //undo loserTxn's effect
             LOGGER.log(Level.INFO, "undoing loser transaction's effect");
 
-            IDatasetLifecycleManager datasetLifecycleManager =
-                    
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
+            IDatasetLifecycleManager datasetLifecycleManager = 
txnSubsystem.getAsterixAppRuntimeContextProvider()
+                    .getDatasetLifecycleManager();
             //TODO sort loser entities by smallest LSN to undo in one pass.
             Iterator<Entry<TxnEntityId, List<Long>>> iter = 
jobLoserEntity2LSNsMap.entrySet().iterator();
             int undoCount = 0;
@@ -664,8 +664,8 @@ public class RecoveryManager implements IRecoveryManager, 
ILifeCycleComponent {
 
     private static void undo(ILogRecord logRecord, IDatasetLifecycleManager 
datasetLifecycleManager) {
         try {
-            ILSMIndex index =
-                    (ILSMIndex) 
datasetLifecycleManager.getIndex(logRecord.getDatasetId(), 
logRecord.getResourceId());
+            ILSMIndex index = (ILSMIndex) 
datasetLifecycleManager.getIndex(logRecord.getDatasetId(),
+                    logRecord.getResourceId());
             ILSMIndexAccessor indexAccessor = 
index.createAccessor(NoOpIndexAccessParameters.INSTANCE);
             if (logRecord.getNewOp() == 
AbstractIndexModificationOperationCallback.INSERT_BYTE) {
                 indexAccessor.forceDelete(logRecord.getNewValue());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
index 47f9315..41a0c3b 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java
@@ -25,7 +25,6 @@ import java.util.logging.Logger;
 import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.config.TransactionProperties;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.transactions.Checkpoint;
 import org.apache.asterix.common.transactions.CheckpointProperties;
 import org.apache.asterix.common.transactions.IAppRuntimeContextProvider;
@@ -72,8 +71,8 @@ public class TransactionSubsystem implements 
ITransactionSubsystem {
         this.lockManager = new 
ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer());
         ReplicationProperties repProperties =
                 
asterixAppRuntimeContextProvider.getAppContext().getReplicationProperties();
-        IReplicationStrategy replicationStrategy = 
repProperties.getReplicationStrategy();
-        final boolean replicationEnabled = repProperties.isParticipant(id);
+        final boolean replicationEnabled = 
repProperties.isReplicationEnabled();
+
         final CheckpointProperties checkpointProperties = new 
CheckpointProperties(txnProperties, id);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.log(Level.INFO, "Checkpoint Properties: " + 
checkpointProperties);
@@ -84,11 +83,7 @@ public class TransactionSubsystem implements 
ITransactionSubsystem {
             transactionManager.ensureMaxTxnId(latestCheckpoint.getMaxTxnId());
         }
 
-        if (replicationEnabled) {
-            this.logManager = new LogManagerWithReplication(this, 
replicationStrategy);
-        } else {
-            this.logManager = new LogManager(this);
-        }
+        this.logManager = replicationEnabled ? new 
LogManagerWithReplication(this) : new LogManager(this);
         this.recoveryManager = new RecoveryManager(this, serviceCtx);
         if (this.txnProperties.isCommitProfilerEnabled()) {
             ecp = new EntityCommitProfiler(this, 
this.txnProperties.getCommitProfilerReportInterval());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
index e4a6f4b..c00b1b6 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
+import java.util.stream.Collectors;
 
 import org.apache.asterix.app.nc.task.BindMetadataNodeTask;
 import org.apache.asterix.app.nc.task.CheckpointTask;
@@ -61,9 +62,11 @@ import 
org.apache.asterix.common.messaging.api.ICCMessageBroker;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.replication.INCLifecycleMessage;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.Replica;
 import org.apache.asterix.common.transactions.IRecoveryManager.SystemState;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.util.FaultToleranceUtil;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hyracks.api.application.ICCServiceContext;
 import 
org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -84,9 +87,11 @@ public class AutoFaultToleranceStrategy implements 
IFaultToleranceStrategy {
     private IReplicationStrategy replicationStrategy;
     private ICCServiceContext serviceCtx;
     private Set<String> pendingStartupCompletionNodes = new HashSet<>();
+    private List<String> nodeIds;
+    private Map<String, SystemState> startupQueue = new HashMap<>();
 
     @Override
-    public void notifyNodeJoin(String nodeId) throws HyracksDataException {
+    public void notifyNodeJoin(String nodeId) {
         pendingStartupCompletionNodes.add(nodeId);
     }
 
@@ -135,14 +140,13 @@ public class AutoFaultToleranceStrategy implements 
IFaultToleranceStrategy {
     private synchronized void requestPartitionsTakeover(String failedNodeId) {
         //replica -> list of partitions to takeover
         Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>();
-        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
-        ReplicationProperties replicationProperties = 
appCtx.getReplicationProperties();
         //collect the partitions of the failed NC
         List<ClusterPartition> lostPartitions = 
getNodeAssignedPartitions(failedNodeId);
         if (!lostPartitions.isEmpty()) {
             for (ClusterPartition partition : lostPartitions) {
                 //find replicas for this partitions
-                Set<String> partitionReplicas = 
replicationProperties.getNodeReplicasIds(partition.getNodeId());
+                Set<String> partitionReplicas = 
replicationStrategy.getRemoteReplicas(partition.getNodeId()).stream()
+                        .map(Replica::getId).collect(Collectors.toSet());
                 //find a replica that is still active
                 for (String replica : partitionReplicas) {
                     //TODO (mhubail) currently this assigns the partition to 
the first found active replica.
@@ -203,9 +207,8 @@ public class AutoFaultToleranceStrategy implements 
IFaultToleranceStrategy {
         planId2FailbackPlanMap.put(plan.getPlanId(), plan);
 
         //get all partitions this node requires to resync
-        ICcApplicationContext appCtx = (ICcApplicationContext) 
serviceCtx.getApplicationContext();
-        ReplicationProperties replicationProperties = 
appCtx.getReplicationProperties();
-        Set<String> nodeReplicas = 
replicationProperties.getNodeReplicasIds(failingBackNodeId);
+        Set<String> nodeReplicas = 
replicationStrategy.getRemoteReplicas(failingBackNodeId).stream().map(Replica::getId)
+                .collect(Collectors.toSet());
         clusterManager.getClusterPartitons();
         for (String replicaId : nodeReplicas) {
             ClusterPartition[] nodePartitions = 
clusterManager.getNodePartitions(replicaId);
@@ -425,6 +428,7 @@ public class AutoFaultToleranceStrategy implements 
IFaultToleranceStrategy {
         ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker();
         ft.replicationStrategy = replicationStrategy;
         ft.serviceCtx = serviceCtx;
+        ft.nodeIds = serviceCtx.getAppConfig().getNCNames();
         return ft;
     }
 
@@ -486,18 +490,29 @@ public class AutoFaultToleranceStrategy implements 
IFaultToleranceStrategy {
     private synchronized void process(RegistrationTasksRequestMessage msg) 
throws HyracksDataException {
         final String nodeId = msg.getNodeId();
         final SystemState state = msg.getState();
-        List<INCLifecycleTask> tasks;
-        if (state == SystemState.BOOTSTRAPPING || state == 
SystemState.HEALTHY) {
-            tasks = buildStartupSequence(nodeId);
+        //last node needed to start
+        if (startupQueue.keySet().size() == nodeIds.size() - 1) {
+            startupQueue.put(nodeId, state);
+            for (Map.Entry<String, SystemState> nodeState : 
startupQueue.entrySet()) {
+                List<INCLifecycleTask> tasks = 
buildStartupSequence(nodeState.getKey());
+                RegistrationTasksResponseMessage response = new 
RegistrationTasksResponseMessage(nodeState.getKey(),
+                        tasks);
+                try {
+                    messageBroker.sendApplicationMessageToNC(response, 
nodeState.getKey());
+                } catch (Exception e) {
+                    throw HyracksDataException.create(e);
+                }
+            }
+        } else if (!failedNodes.isEmpty()) {
+            List<INCLifecycleTask> tasks = buildFailbackStartupSequence();
+            RegistrationTasksResponseMessage response = new 
RegistrationTasksResponseMessage(nodeId, tasks);
+            try {
+                messageBroker.sendApplicationMessageToNC(response, 
msg.getNodeId());
+            } catch (Exception e) {
+                throw HyracksDataException.create(e);
+            }
         } else {
-            // failed node returned. Need to start failback process
-            tasks = buildFailbackStartupSequence();
-        }
-        RegistrationTasksResponseMessage response = new 
RegistrationTasksResponseMessage(nodeId, tasks);
-        try {
-            messageBroker.sendApplicationMessageToNC(response, 
msg.getNodeId());
-        } catch (Exception e) {
-            throw HyracksDataException.create(e);
+            startupQueue.put(nodeId, state);
         }
     }
 
@@ -525,4 +540,4 @@ public class AutoFaultToleranceStrategy implements 
IFaultToleranceStrategy {
         }
         return tasks;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
index 4e8ecd9..fe24fca 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java
@@ -18,47 +18,31 @@
  */
 package org.apache.asterix.app.replication;
 
-import java.util.HashMap;
-import java.util.Map;
-
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.replication.IReplicationStrategy;
-import org.apache.asterix.event.schema.cluster.Cluster;
 import org.apache.hyracks.api.application.ICCServiceContext;
 
 public class FaultToleranceStrategyFactory {
 
-    private static final Map<String, Class<? extends IFaultToleranceStrategy>> 
BUILT_IN_FAULT_TOLERANCE_STRATEGY =
-            new HashMap<>();
-
-    static {
-        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("no_fault_tolerance", 
NoFaultToleranceStrategy.class);
-        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("metadata_node", 
MetadataNodeFaultToleranceStrategy.class);
-        BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("auto", 
AutoFaultToleranceStrategy.class);
-    }
-
     private FaultToleranceStrategyFactory() {
         throw new AssertionError();
     }
 
-    public static IFaultToleranceStrategy create(Cluster cluster, 
IReplicationStrategy repStrategy,
-            ICCServiceContext serviceCtx) {
-        boolean highAvailabilityEnabled =
-                cluster.getHighAvailability() != null && 
cluster.getHighAvailability().getEnabled() != null
-                        && 
Boolean.valueOf(cluster.getHighAvailability().getEnabled());
-
-        if (!highAvailabilityEnabled || 
cluster.getHighAvailability().getFaultTolerance() == null
-                || 
cluster.getHighAvailability().getFaultTolerance().getStrategy() == null) {
-            return new NoFaultToleranceStrategy().from(serviceCtx, 
repStrategy);
-        }
-        String strategyName = 
cluster.getHighAvailability().getFaultTolerance().getStrategy().toLowerCase();
-        if (!BUILT_IN_FAULT_TOLERANCE_STRATEGY.containsKey(strategyName)) {
-            throw new IllegalArgumentException(String.format("Unsupported 
Replication Strategy. Available types: %s",
-                    BUILT_IN_FAULT_TOLERANCE_STRATEGY.keySet().toString()));
+    public static final String STRATEGY_NAME = "metadata_only";
+
+    public static IFaultToleranceStrategy create(ICCServiceContext serviceCtx, 
ReplicationProperties repProp,
+            IReplicationStrategy strategy) {
+        Class<? extends IFaultToleranceStrategy> clazz;
+        if (!repProp.isReplicationEnabled()) {
+            clazz = NoFaultToleranceStrategy.class;
+        } else if (STRATEGY_NAME.equals(repProp.getReplicationStrategy())) {
+            clazz = MetadataNodeFaultToleranceStrategy.class;
+        } else {
+            clazz = AutoFaultToleranceStrategy.class;
         }
-        Class<? extends IFaultToleranceStrategy> clazz = 
BUILT_IN_FAULT_TOLERANCE_STRATEGY.get(strategyName);
         try {
-            return clazz.newInstance().from(serviceCtx, repStrategy);
+            return clazz.newInstance().from(serviceCtx, strategy);
         } catch (InstantiationException | IllegalAccessException e) {
             throw new IllegalStateException(e);
         }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
index 3470d7b..0080bbd 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java
@@ -234,6 +234,9 @@ public class MetadataNodeFaultToleranceStrategy implements 
IFaultToleranceStrate
         final List<INCLifecycleTask> tasks = new ArrayList<>();
         switch (state) {
             case PERMANENT_DATA_LOSS:
+                if (failedNodes.isEmpty()) { //bootstrap
+                    break;
+                }
                 // If the metadata node (or replica) failed and lost its data
                 // => Metadata Remote Recovery from standby replica
                 tasks.add(getMetadataPartitionRecoveryPlan());

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 6976507..5609605 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -59,7 +59,6 @@ import org.apache.asterix.app.result.ResultHandle;
 import org.apache.asterix.app.result.ResultReader;
 import org.apache.asterix.common.api.IMetadataLockManager;
 import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.DatasetConfig.DatasetType;
 import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp;
 import org.apache.asterix.common.config.DatasetConfig.IndexType;
@@ -496,8 +495,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
             throw new CompilationException("Unknown compaction policy: " + 
compactionPolicy);
         }
         String compactionPolicyFactoryClassName = 
compactionPolicyEntity.getClassName();
-        ILSMMergePolicyFactory mergePolicyFactory =
-                (ILSMMergePolicyFactory) 
Class.forName(compactionPolicyFactoryClassName).newInstance();
+        ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) 
Class
+                .forName(compactionPolicyFactoryClassName).newInstance();
         if (isExternalDataset && 
mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) {
             throw new CompilationException("The correlated-prefix merge policy 
cannot be used with external dataset.");
         }
@@ -584,10 +583,10 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     }
                     ARecordType metaRecType = (ARecordType) metaItemType;
 
-                    List<List<String>> partitioningExprs =
-                            ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getPartitioningExprs();
-                    List<Integer> keySourceIndicators =
-                            ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getKeySourceIndicators();
+                    List<List<String>> partitioningExprs = 
((InternalDetailsDecl) dd.getDatasetDetailsDecl())
+                            .getPartitioningExprs();
+                    List<Integer> keySourceIndicators = ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl())
+                            .getKeySourceIndicators();
                     boolean autogenerated = ((InternalDetailsDecl) 
dd.getDatasetDetailsDecl()).isAutogenerated();
                     ARecordType aRecordType = (ARecordType) itemType;
                     List<IAType> partitioningTypes = 
ValidateUtil.validatePartitioningExpressions(aRecordType,
@@ -612,8 +611,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                     String adapter = ((ExternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getAdapter();
                     Map<String, String> properties = ((ExternalDetailsDecl) 
dd.getDatasetDetailsDecl()).getProperties();
 
-                    datasetDetails =
-                            new ExternalDatasetDetails(adapter, properties, 
new Date(), TransactionState.COMMIT);
+                    datasetDetails = new ExternalDatasetDetails(adapter, 
properties, new Date(),
+                            TransactionState.COMMIT);
                     break;
                 default:
                     throw new CompilationException("Unknown datatype " + 
dd.getDatasetType());
@@ -704,8 +703,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
     protected static void 
validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset)
             throws CompilationException {
         StringBuilder builder = null;
-        ActiveNotificationHandler activeEventHandler =
-                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+        ActiveNotificationHandler activeEventHandler = 
(ActiveNotificationHandler) appCtx
+                .getActiveNotificationHandler();
         IActiveEntityEventsListener[] listeners = 
activeEventHandler.getEventListeners();
         for (IActiveEntityEventsListener listener : listeners) {
             if (listener.isEntityUsingDataset(dataset) && listener.isActive()) 
{
@@ -808,8 +807,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
 
             for (Pair<List<String>, IndexedTypeExpression> fieldExpr : 
stmtCreateIndex.getFieldExprs()) {
                 IAType fieldType = null;
-                ARecordType subType =
-                        KeyFieldTypeUtil.chooseSource(keySourceIndicators, 
keyIndex, aRecordType, metaRecordType);
+                ARecordType subType = 
KeyFieldTypeUtil.chooseSource(keySourceIndicators, keyIndex, aRecordType,
+                        metaRecordType);
                 boolean isOpen = subType.isOpen();
                 int i = 0;
                 if (fieldExpr.first.size() > 1 && !isOpen) {
@@ -845,8 +844,8 @@ public class QueryTranslator extends AbstractLangTranslator 
implements IStatemen
                     if (stmtCreateIndex.hasMetaField()) {
                         throw new AlgebricksException("Typed open index can 
only be created on the record part");
                     }
-                    Map<TypeSignature, IAType> typeMap =
-                            TypeTranslator.computeTypes(mdTxnCtx, 
fieldExpr.second.getType(), indexName, dataverseName);
+                    Map<TypeSignature, IAType> typeMap = 
TypeTranslator.computeTypes(mdTxnCtx,
+                            fieldExpr.second.getType(), indexName, 
dataverseName);
                     TypeSignature typeSignature = new 
TypeSignature(dataverseName, indexName);
                     fieldType = typeMap.get(typeSignature);
                     overridesFieldTypes = true;
@@ -1061,8 +1060,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
                 bActiveTxn = true;
                 try {
-                    JobSpecification jobSpec =
-                            
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds);
+                    JobSpecification jobSpec = 
ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider,
+                            ds);
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
                     bActiveTxn = false;
                     runJob(hcc, jobSpec, jobFlags);
@@ -1218,8 +1217,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 }
             }
             // # check whether any function in current dataverse is being used 
by others
-            List<Function> functionsInDataverse =
-                    MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, 
dataverseName);
+            List<Function> functionsInDataverse = 
MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx,
+                    dataverseName);
             for (Function function : functionsInDataverse) {
                 if (checkWhetherFunctionIsBeingUsed(mdTxnCtx, 
function.getDataverseName(), function.getName(),
                         function.getArity(), dataverseName)) {
@@ -1230,8 +1229,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
             // # disconnect all feeds from any datasets in the dataverse.
-            ActiveNotificationHandler activeEventHandler =
-                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+            ActiveNotificationHandler activeEventHandler = 
(ActiveNotificationHandler) appCtx
+                    .getActiveNotificationHandler();
             IActiveEntityEventsListener[] activeListeners = 
activeEventHandler.getEventListeners();
             for (IActiveEntityEventsListener listener : activeListeners) {
                 EntityId activeEntityId = listener.getEntityId();
@@ -1259,15 +1258,15 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 String datasetName = dataset.getDatasetName();
                 DatasetType dsType = dataset.getDatasetType();
                 if (dsType == DatasetType.INTERNAL) {
-                    List<Index> indexes =
-                            
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
+                    List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+                            datasetName);
                     for (Index index : indexes) {
                         
jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, 
dataset));
                     }
                 } else {
                     // External dataset
-                    List<Index> indexes =
-                            
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, 
datasetName);
+                    List<Index> indexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+                            datasetName);
                     for (int k = 0; k < indexes.size(); k++) {
                         if 
(ExternalIndexingOperations.isFileIndex(indexes.get(k))) {
                             jobsToExecute.add(
@@ -1375,8 +1374,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     public static void doDropDataset(String dataverseName, String datasetName, 
MetadataProvider metadataProvider,
             boolean ifExists, IHyracksClientConnection hcc, boolean 
dropCorrespondingNodeGroup) throws Exception {
         MutableObject<ProgressState> progress = new 
MutableObject<>(ProgressState.NO_PROGRESS);
-        MutableObject<MetadataTransactionContext> mdTxnCtx =
-                new 
MutableObject<>(MetadataManager.INSTANCE.beginTransaction());
+        MutableObject<MetadataTransactionContext> mdTxnCtx = new 
MutableObject<>(
+                MetadataManager.INSTANCE.beginTransaction());
         MutableBoolean bActiveTxn = new MutableBoolean(true);
         metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue());
         List<JobSpecification> jobsToExecute = new ArrayList<>();
@@ -1451,8 +1450,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 throw new AlgebricksException(
                         "There is no dataset with this name " + datasetName + 
" in dataverse " + dataverseName);
             }
-            ActiveNotificationHandler activeEventHandler =
-                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+            ActiveNotificationHandler activeEventHandler = 
(ActiveNotificationHandler) appCtx
+                    .getActiveNotificationHandler();
             IActiveEntityEventsListener[] listeners = 
activeEventHandler.getEventListeners();
             StringBuilder builder = null;
             for (IActiveEntityEventsListener listener : listeners) {
@@ -1521,8 +1520,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 }
                 // #. prepare a job to drop the index in NC.
                 jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, 
metadataProvider, ds));
-                List<Index> datasetIndexes =
-                        MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, 
dataverseName, datasetName);
+                List<Index> datasetIndexes = 
MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName,
+                        datasetName);
                 if (datasetIndexes.size() == 2) {
                     dropFilesIndex = true;
                     // only one index + the files index, we need to delete 
both of the indexes
@@ -1763,9 +1762,9 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         MetadataLockUtil.modifyDatasetBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName);
         try {
-            CompiledLoadFromFileStatement cls =
-                    new CompiledLoadFromFileStatement(dataverseName, 
loadStmt.getDatasetName().getValue(),
-                            loadStmt.getAdapter(), loadStmt.getProperties(), 
loadStmt.dataIsAlreadySorted());
+            CompiledLoadFromFileStatement cls = new 
CompiledLoadFromFileStatement(dataverseName,
+                    loadStmt.getDatasetName().getValue(), 
loadStmt.getAdapter(), loadStmt.getProperties(),
+                    loadStmt.dataIsAlreadySorted());
             JobSpecification spec = apiFramework.compileQuery(hcc, 
metadataProvider, null, 0, null, sessionOutput, cls);
             MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
             bActiveTxn = false;
@@ -1882,8 +1881,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             throws RemoteException, AlgebricksException, ACIDException {
 
         // Query Rewriting (happens under the same ongoing metadata 
transaction)
-        Pair<IReturningStatement, Integer> rewrittenResult =
-                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
query, sessionOutput);
+        Pair<IReturningStatement, Integer> rewrittenResult = 
apiFramework.reWriteQuery(declaredFunctions,
+                metadataProvider, query, sessionOutput);
 
         // Query Compilation (happens under the same ongoing metadata 
transaction)
         return apiFramework.compileQuery(clusterInfoCollector, 
metadataProvider, (Query) rewrittenResult.first,
@@ -1896,8 +1895,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
 
         // Insert/upsert statement rewriting (happens under the same ongoing 
metadata
         // transaction)
-        Pair<IReturningStatement, Integer> rewrittenResult =
-                apiFramework.reWriteQuery(declaredFunctions, metadataProvider, 
insertUpsert, sessionOutput);
+        Pair<IReturningStatement, Integer> rewrittenResult = 
apiFramework.reWriteQuery(declaredFunctions,
+                metadataProvider, insertUpsert, sessionOutput);
 
         InsertStatement rewrittenInsertUpsert = (InsertStatement) 
rewrittenResult.first;
         String dataverseName = 
getActiveDataverse(rewrittenInsertUpsert.getDataverseName());
@@ -1969,8 +1968,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         try {
             mdTxnCtx = MetadataManager.INSTANCE.beginTransaction();
             metadataProvider.setMetadataTxnContext(mdTxnCtx);
-            FeedPolicyEntity feedPolicy =
-                    
MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(),
 dataverse, policy);
+            FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE
+                    .getFeedPolicy(metadataProvider.getMetadataTxnContext(), 
dataverse, policy);
             if (feedPolicy != null) {
                 if (cfps.getIfNotExists()) {
                     MetadataManager.INSTANCE.commitTransaction(mdTxnCtx);
@@ -2048,10 +2047,10 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             throws Exception {
         MetadataTransactionContext mdTxnCtx = 
metadataProvider.getMetadataTxnContext();
         EntityId feedId = feed.getFeedId();
-        ActiveNotificationHandler activeNotificationHandler =
-                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
-        ActiveEntityEventsListener listener =
-                (ActiveEntityEventsListener) 
activeNotificationHandler.getListener(feedId);
+        ActiveNotificationHandler activeNotificationHandler = 
(ActiveNotificationHandler) appCtx
+                .getActiveNotificationHandler();
+        ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeNotificationHandler
+                .getListener(feedId);
         if (listener != null && listener.getState() != ActivityState.STOPPED) {
             throw new AlgebricksException("Feed " + feedId
                     + " is currently active and connected to the following 
dataset(s) \n" + listener.toString());
@@ -2120,15 +2119,15 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 String fqName = feedConnection.getDataverseName() + "." + 
feedConnection.getDatasetName();
                 
lockManager.acquireDatasetReadLock(metadataProvider.getLocks(), fqName);
             }
-            ActiveNotificationHandler activeEventHandler =
-                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+            ActiveNotificationHandler activeEventHandler = 
(ActiveNotificationHandler) appCtx
+                    .getActiveNotificationHandler();
             ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler.getListener(entityId);
             if (listener == null) {
                 // Prepare policy
                 List<Dataset> datasets = new ArrayList<>();
                 for (FeedConnection connection : feedConnections) {
-                    Dataset ds =
-                            
metadataProvider.findDataset(connection.getDataverseName(), 
connection.getDatasetName());
+                    Dataset ds = 
metadataProvider.findDataset(connection.getDataverseName(),
+                            connection.getDatasetName());
                     datasets.add(ds);
                 }
                 listener = new FeedEventsListener(this, 
metadataProvider.getApplicationContext(), hcc, entityId,
@@ -2153,8 +2152,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         String dataverseName = getActiveDataverse(sfst.getDataverseName());
         String feedName = sfst.getFeedName().getValue();
         EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, 
feedName);
-        ActiveNotificationHandler activeEventHandler =
-                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+        ActiveNotificationHandler activeEventHandler = 
(ActiveNotificationHandler) appCtx
+                .getActiveNotificationHandler();
         // Obtain runtime info from ActiveListener
         ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler.getListener(entityId);
         if (listener == null) {
@@ -2180,8 +2179,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         // TODO: Check whether we are connecting a change feed to a non-meta 
dataset
         // Check whether feed is alive
-        ActiveNotificationHandler activeEventHandler =
-                (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+        ActiveNotificationHandler activeEventHandler = 
(ActiveNotificationHandler) appCtx
+                .getActiveNotificationHandler();
         // Transaction handling
         MetadataLockUtil.connectFeedBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName, dataverseName + "." + 
feedName);
@@ -2234,8 +2233,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         MetadataLockUtil.disconnectFeedBegin(lockManager, 
metadataProvider.getLocks(), dataverseName,
                 dataverseName + "." + datasetName, dataverseName + "." + 
cfs.getFeedName());
         try {
-            ActiveNotificationHandler activeEventHandler =
-                    (ActiveNotificationHandler) 
appCtx.getActiveNotificationHandler();
+            ActiveNotificationHandler activeEventHandler = 
(ActiveNotificationHandler) appCtx
+                    .getActiveNotificationHandler();
             // Check whether feed is alive
             ActiveEntityEventsListener listener = (ActiveEntityEventsListener) 
activeEventHandler
                     .getListener(new EntityId(Feed.EXTENSION_NAME, 
dataverseName, feedName));
@@ -2290,8 +2289,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 throw new AlgebricksException(
                         "Cannot compact the extrenal dataset " + datasetName + 
" because it has no indexes");
             }
-            Dataverse dataverse =
-                    
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), 
dataverseName);
+            Dataverse dataverse = 
MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(),
+                    dataverseName);
             jobsToExecute.add(DatasetUtil.compactDatasetJobSpec(dataverse, 
datasetName, metadataProvider));
 
             if (ds.getDatasetType() == DatasetType.INTERNAL) {
@@ -2421,8 +2420,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
     }
 
     private void updateJobStats(JobId jobId, Stats stats) {
-        final IJobManager jobManager =
-                ((ClusterControllerService) 
appCtx.getServiceContext().getControllerService()).getJobManager();
+        final IJobManager jobManager = ((ClusterControllerService) 
appCtx.getServiceContext().getControllerService())
+                .getJobManager();
         final JobRun run = jobManager.get(jobId);
         if (run == null || run.getStatus() != JobStatus.TERMINATED) {
             return;
@@ -2790,8 +2789,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         String dataverseNameTo = 
getActiveDataverse(pregelixStmt.getDataverseNameTo());
         String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue();
         String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue();
-        String fullyQualifiedDatasetNameTo =
-                DatasetUtil.isFullyQualifiedName(datasetNameTo) ? 
datasetNameTo : dataverseNameTo + '.' + datasetNameTo;
+        String fullyQualifiedDatasetNameTo = 
DatasetUtil.isFullyQualifiedName(datasetNameTo) ? datasetNameTo
+                : dataverseNameTo + '.' + datasetNameTo;
         MetadataTransactionContext mdTxnCtx = 
MetadataManager.INSTANCE.beginTransaction();
         metadataProvider.setMetadataTxnContext(mdTxnCtx);
         MetadataLockUtil.insertDeleteUpsertBegin(lockManager, 
metadataProvider.getLocks(), fullyQualifiedDatasetNameTo);
@@ -2874,8 +2873,8 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                 throw new AlgebricksException("Tried to access non-existing 
dataset: " + datasetNameTo);
             }
             // Cleans up the sink dataset -- Drop and then Create.
-            DropDatasetStatement dropStmt =
-                    new DropDatasetStatement(new Identifier(dataverseNameTo), 
pregelixStmt.getDatasetNameTo(), true);
+            DropDatasetStatement dropStmt = new DropDatasetStatement(new 
Identifier(dataverseNameTo),
+                    pregelixStmt.getDatasetNameTo(), true);
             this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc, 
null);
             IDatasetDetailsDecl idd = new 
InternalDetailsDecl(toIndex.getKeyFieldNames(),
                     toIndex.getKeyFieldSourceIndicators(), false, null);
@@ -2969,7 +2968,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             String fromDatasetName, String toDataverseName, String 
toDatasetName) {
         // Constructs AsterixDB parameters, e.g., URL, source dataset and sink 
dataset.
         ExternalProperties externalProperties = appCtx.getExternalProperties();
-        String clientIP = 
ClusterProperties.INSTANCE.getCluster().getMasterNode().getClientIp();
+        String clientIP = 
appCtx.getServiceContext().getCCContext().getClusterControllerInfo().getClientNetAddress();
         StringBuilder asterixdbParameterBuilder = new StringBuilder();
         asterixdbParameterBuilder.append(
                 "pregelix.asterixdb.url=" + "http://"; + clientIP + ":" + 
externalProperties.getAPIServerPort() + ",");

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
deleted file mode 100644
index c9b3565..0000000
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.drivers;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.Reader;
-import java.util.List;
-
-import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil;
-import org.apache.asterix.api.java.AsterixJavaClient;
-import org.apache.asterix.app.translator.DefaultStatementExecutorFactory;
-import org.apache.asterix.common.config.GlobalConfig;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.compiler.provider.AqlCompilationProvider;
-import org.apache.asterix.compiler.provider.ILangCompilationProvider;
-import org.apache.asterix.file.StorageComponentProvider;
-import org.apache.commons.io.FileUtils;
-import org.kohsuke.args4j.Argument;
-import org.kohsuke.args4j.CmdLineParser;
-import org.kohsuke.args4j.Option;
-
-public class AsterixCLI {
-    private static class Options {
-        @Option(name = "-properties", usage = "Name of properties file", 
required = true)
-        public String properties;
-
-        @Option(name = "-output", usage = "Output folder to place results", 
required = true)
-        public String outputFolder;
-
-        @Argument(usage = "AQL Files to run", multiValued = true, required = 
true)
-        public List<String> args;
-    }
-
-    private static AsterixHyracksIntegrationUtil integrationUtil = new 
AsterixHyracksIntegrationUtil();
-
-    public static void main(String args[]) throws Exception {
-        Options options = new Options();
-        CmdLineParser parser = new CmdLineParser(options);
-        parser.parseArgument(args);
-        ILangCompilationProvider compilationProvider = new 
AqlCompilationProvider();
-        setUp(options);
-        try {
-            for (String queryFile : options.args) {
-                Reader in = new FileReader(queryFile);
-                AsterixJavaClient ajc =
-                        new AsterixJavaClient((ICcApplicationContext) 
integrationUtil.cc.getApplicationContext(),
-                        integrationUtil.getHyracksClientConnection(), in,
-                        compilationProvider, new 
DefaultStatementExecutorFactory(), new StorageComponentProvider());
-                try {
-                    ajc.compile(true, false, false, false, false, true, false);
-                } finally {
-                    in.close();
-                }
-                ajc.execute();
-            }
-        } finally {
-            tearDown();
-        }
-        System.exit(0);
-    }
-
-    public static void setUp(Options options) throws Exception {
-        System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, 
options.properties);
-        File outdir = new File(options.outputFolder);
-        outdir.mkdirs();
-
-        File log = new File("asterix_logs");
-        if (log.exists()) {
-            FileUtils.deleteDirectory(log);
-        }
-        File lsn = new File("last_checkpoint_lsn");
-        lsn.deleteOnExit();
-
-        integrationUtil.init(false);
-    }
-
-    public static void tearDown() throws Exception {
-        integrationUtil.deinit(false);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
index 8283257..40d8996 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java
@@ -26,6 +26,7 @@ import static 
org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNEC
 import static 
org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.ConcurrentMap;
@@ -58,17 +59,19 @@ import org.apache.asterix.app.cc.CCExtensionManager;
 import org.apache.asterix.app.external.ExternalLibraryUtils;
 import org.apache.asterix.app.replication.FaultToleranceStrategyFactory;
 import org.apache.asterix.common.api.AsterixThreadFactory;
-import org.apache.asterix.common.api.IClusterManagementWork.ClusterState;
+import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.INodeJobTracker;
 import org.apache.asterix.common.config.AsterixExtension;
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.config.ExternalProperties;
 import org.apache.asterix.common.config.MetadataProperties;
+import org.apache.asterix.common.config.PropertiesAccessor;
+import org.apache.asterix.common.config.ReplicationProperties;
 import org.apache.asterix.common.context.IStorageComponentProvider;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
 import org.apache.asterix.common.library.ILibraryManager;
 import org.apache.asterix.common.replication.IFaultToleranceStrategy;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.replication.ReplicationStrategyFactory;
 import org.apache.asterix.common.utils.Servlets;
 import org.apache.asterix.external.library.ExternalLibraryManager;
 import org.apache.asterix.file.StorageComponentProvider;
@@ -76,7 +79,6 @@ import org.apache.asterix.messaging.CCMessageBroker;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.api.IAsterixStateProxy;
 import org.apache.asterix.metadata.bootstrap.AsterixStateProxy;
-import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.metadata.lock.MetadataLockManager;
 import org.apache.asterix.runtime.job.resource.JobCapacityController;
 import org.apache.asterix.runtime.utils.CcApplicationContext;
@@ -122,8 +124,8 @@ public class CCApplication extends BaseCCApplication {
         if (args.length > 0) {
             throw new IllegalArgumentException("Unrecognized argument(s): " + 
Arrays.toString(args));
         }
-        final ClusterControllerService controllerService =
-                (ClusterControllerService) ccServiceCtx.getControllerService();
+        final ClusterControllerService controllerService = 
(ClusterControllerService) ccServiceCtx
+                .getControllerService();
         ccServiceCtx.setMessageBroker(new CCMessageBroker(controllerService));
 
         
configureLoggingLevel(ccServiceCtx.getAppConfig().getLoggingLevel(ExternalProperties.Option.LOG_LEVEL));
@@ -136,16 +138,19 @@ public class CCApplication extends BaseCCApplication {
         int port = 
ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort();
         hcc = new HyracksConnection(strIP, port);
         ILibraryManager libraryManager = new ExternalLibraryManager();
-
-        IReplicationStrategy repStrategy = 
ClusterProperties.INSTANCE.getReplicationStrategy();
-        IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory
-                .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, 
ccServiceCtx);
+        ReplicationProperties repProp = new ReplicationProperties(
+                PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig()));
+        IReplicationStrategy repStrategy = 
ReplicationStrategyFactory.create(repProp.getReplicationStrategy(), repProp,
+                getConfigManager());
+        IFaultToleranceStrategy ftStrategy = 
FaultToleranceStrategyFactory.create(ccServiceCtx, repProp, repStrategy);
         ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false);
         componentProvider = new StorageComponentProvider();
         GlobalRecoveryManager globalRecoveryManager = 
createGlobalRecoveryManager();
         statementExecutorCtx = new StatementExecutorContext();
         appCtx = createApplicationContext(libraryManager, 
globalRecoveryManager, ftStrategy);
-        ccExtensionManager = new CCExtensionManager(getExtensions());
+        List<AsterixExtension> extensions = new ArrayList<>();
+        extensions.addAll(this.getExtensions());
+        ccExtensionManager = new CCExtensionManager(extensions);
         appCtx.setExtensionManager(ccExtensionManager);
         final CCConfig ccConfig = controllerService.getCCConfig();
         if (System.getProperty("java.rmi.server.hostname") == null) {
@@ -162,7 +167,6 @@ public class CCApplication extends BaseCCApplication {
         webManager = new WebManager();
         configureServers();
         webManager.start();
-        
ClusterManagerProvider.getClusterManager().registerSubscriber(globalRecoveryManager);
         ccServiceCtx.addClusterLifecycleListener(new 
ClusterLifecycleListener(appCtx));
         final INodeJobTracker nodeJobTracker = appCtx.getNodeJobTracker();
         ccServiceCtx.addJobLifecycleListener(nodeJobTracker);
@@ -222,8 +226,8 @@ public class CCApplication extends BaseCCApplication {
     }
 
     protected HttpServer setupJSONAPIServer(ExternalProperties 
externalProperties) throws Exception {
-        HttpServer jsonAPIServer =
-                new HttpServer(webManager.getBosses(), 
webManager.getWorkers(), externalProperties.getAPIServerPort());
+        HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(), 
webManager.getWorkers(),
+                externalProperties.getAPIServerPort());
         jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc);
         jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx);
         jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR,
@@ -346,8 +350,7 @@ public class CCApplication extends BaseCCApplication {
     @Override
     public void startupCompleted() throws Exception {
         ccServiceCtx.getControllerService().getExecutor().submit(() -> {
-            appCtx.getClusterStateManager().waitForState(ClusterState.ACTIVE);
-            
ClusterManagerProvider.getClusterManager().notifyStartupCompleted();
+            
appCtx.getClusterStateManager().waitForState(IClusterManagementWork.ClusterState.ACTIVE);
             return null;
         });
     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
index 8f479eb..217d6e2 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java
@@ -28,18 +28,14 @@ import java.util.concurrent.LinkedBlockingQueue;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 
-import org.apache.asterix.common.api.IClusterEventsSubscriber;
 import org.apache.asterix.common.api.IClusterManagementWork;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse;
 import org.apache.asterix.common.api.IClusterManagementWorkResponse.Status;
 import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.config.ClusterProperties;
 import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.MetadataManager;
 import org.apache.asterix.metadata.cluster.AddNodeWork;
 import org.apache.asterix.metadata.cluster.AddNodeWorkResponse;
-import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
 import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse;
 import org.apache.hyracks.api.application.IClusterLifecycleListener;
@@ -56,7 +52,7 @@ public class ClusterLifecycleListener implements 
IClusterLifecycleListener {
 
     public ClusterLifecycleListener(ICcApplicationContext appCtx) {
         this.appCtx = appCtx;
-        eventHandler = new ClusterWorkExecutor(appCtx, workRequestQueue);
+        eventHandler = new ClusterWorkExecutor(workRequestQueue);
         Thread t = new Thread(eventHandler);
         if (LOGGER.isLoggable(Level.INFO)) {
             LOGGER.info("Starting cluster event handler");
@@ -80,17 +76,6 @@ public class ClusterLifecycleListener implements 
IClusterLifecycleListener {
         Set<String> nodeAddition = new HashSet<>();
         nodeAddition.add(nodeId);
         updateProgress(ClusterEventType.NODE_JOIN, nodeAddition);
-        Set<IClusterEventsSubscriber> subscribers =
-                
ClusterManagerProvider.getClusterManager().getRegisteredClusterEventSubscribers();
-        Set<IClusterManagementWork> work = new HashSet<>();
-        for (IClusterEventsSubscriber sub : subscribers) {
-            Set<IClusterManagementWork> workRequest = 
sub.notifyNodeJoin(nodeId);
-            work.addAll(workRequest);
-        }
-        if (!work.isEmpty()) {
-            executeWorkSet(work);
-        }
-
     }
 
     @Override
@@ -108,13 +93,7 @@ public class ClusterLifecycleListener implements 
IClusterLifecycleListener {
             }
         }
         updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds);
-        Set<IClusterEventsSubscriber> subscribers =
-                
ClusterManagerProvider.getClusterManager().getRegisteredClusterEventSubscribers();
         Set<IClusterManagementWork> work = new HashSet<>();
-        for (IClusterEventsSubscriber sub : subscribers) {
-            Set<IClusterManagementWork> workRequest = 
sub.notifyNodeFailure(deadNodeIds);
-            work.addAll(workRequest);
-        }
         if (!work.isEmpty()) {
             executeWorkSet(work);
         }
@@ -163,38 +142,14 @@ public class ClusterLifecycleListener implements 
IClusterLifecycleListener {
                 case REMOVE_NODE:
                     nodesToRemove.addAll(((RemoveNodeWork) 
w).getNodesToBeRemoved());
                     nodeRemovalRequests.add(w);
-                    RemoveNodeWorkResponse response =
-                            new RemoveNodeWorkResponse((RemoveNodeWork) w, 
Status.IN_PROGRESS);
+                    RemoveNodeWorkResponse response = new 
RemoveNodeWorkResponse((RemoveNodeWork) w,
+                            Status.IN_PROGRESS);
                     pendingWorkResponses.add(response);
                     break;
             }
         }
 
         List<String> addedNodes = new ArrayList<>();
-        String asterixInstanceName = 
ClusterProperties.INSTANCE.getCluster().getInstanceName();
-        IClusterStateManager csm = appCtx.getClusterStateManager();
-        for (int i = 0; i < nodesToAdd; i++) {
-            Node node = csm.getAvailableSubstitutionNode();
-            if (node != null) {
-                try {
-                    ClusterManagerProvider.getClusterManager().addNode(appCtx, 
node);
-                    addedNodes.add(asterixInstanceName + "_" + node.getId());
-                    if (LOGGER.isLoggable(Level.INFO)) {
-                        LOGGER.info("Added NC at:" + node.getId());
-                    }
-                } catch (Exception e) {
-                    if (LOGGER.isLoggable(Level.WARNING)) {
-                        LOGGER.warning("Unable to add NC at:" + node.getId());
-                    }
-                    e.printStackTrace();
-                }
-            } else {
-                if (LOGGER.isLoggable(Level.WARNING)) {
-                    LOGGER.warning("Unable to add NC: no more available 
nodes");
-                }
-
-            }
-        }
 
         for (AddNodeWork w : nodeAdditionRequests) {
             int n = w.getNumberOfNodesRequested();

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
index 2977a58..bcc1c60 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java
@@ -25,23 +25,16 @@ import java.util.logging.Level;
 import java.util.logging.Logger;
 
 import org.apache.asterix.common.api.IClusterManagementWork;
-import org.apache.asterix.common.cluster.IClusterStateManager;
-import org.apache.asterix.common.dataflow.ICcApplicationContext;
-import org.apache.asterix.common.exceptions.AsterixException;
-import org.apache.asterix.event.schema.cluster.Node;
 import org.apache.asterix.metadata.cluster.AddNodeWork;
-import org.apache.asterix.metadata.cluster.ClusterManagerProvider;
 import org.apache.asterix.metadata.cluster.RemoveNodeWork;
 
 public class ClusterWorkExecutor implements Runnable {
 
     private static final Logger LOGGER = 
Logger.getLogger(ClusterWorkExecutor.class.getName());
 
-    private final ICcApplicationContext appCtx;
     private final LinkedBlockingQueue<Set<IClusterManagementWork>> inbox;
 
-    public ClusterWorkExecutor(ICcApplicationContext appCtx, 
LinkedBlockingQueue<Set<IClusterManagementWork>> inbox) {
-        this.appCtx = appCtx;
+    public 
ClusterWorkExecutor(LinkedBlockingQueue<Set<IClusterManagementWork>> inbox) {
         this.inbox = inbox;
     }
 
@@ -69,30 +62,6 @@ public class ClusterWorkExecutor implements Runnable {
                     }
                 }
 
-                IClusterStateManager csm = appCtx.getClusterStateManager();
-                Set<Node> addedNodes = new HashSet<>();
-                for (int i = 0; i < nodesToAdd; i++) {
-                    Node node = csm.getAvailableSubstitutionNode();
-                    if (node != null) {
-                        try {
-                            
ClusterManagerProvider.getClusterManager().addNode(appCtx, node);
-                            addedNodes.add(node);
-                            if (LOGGER.isLoggable(Level.INFO)) {
-                                LOGGER.info("Added NC at:" + node.getId());
-                            }
-                        } catch (AsterixException e) {
-                            if (LOGGER.isLoggable(Level.WARNING)) {
-                                LOGGER.warning("Unable to add NC at:" + 
node.getId());
-                            }
-                            e.printStackTrace();
-                        }
-                    } else {
-                        if (LOGGER.isLoggable(Level.WARNING)) {
-                            LOGGER.warning("Unable to add NC: no more 
available nodes");
-                        }
-                    }
-                }
-
             } catch (InterruptedException e) {
                 if (LOGGER.isLoggable(Level.SEVERE)) {
                     LOGGER.severe("interruped" + e.getMessage());

Reply via email to