http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java index 98dabc9..ea32e94 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -40,10 +40,12 @@ import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.hyracks.bootstrap.CCApplication; import org.apache.asterix.hyracks.bootstrap.NCApplication; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.application.ICCApplication; import org.apache.hyracks.api.application.INCApplication; import org.apache.hyracks.api.client.HyracksConnection; import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.config.IOption; import org.apache.hyracks.control.cc.ClusterControllerService; import org.apache.hyracks.control.common.config.ConfigManager; import org.apache.hyracks.control.common.controllers.CCConfig; @@ -52,30 +54,55 @@ import org.apache.hyracks.control.common.controllers.NCConfig; import org.apache.hyracks.control.nc.NodeControllerService; import org.kohsuke.args4j.CmdLineException; +@SuppressWarnings({"squid:ClassVariableVisibilityCheck","squid:S00112"}) public class AsterixHyracksIntegrationUtil { - static class LoggerHolder { - static final Logger LOGGER = Logger.getLogger(AsterixHyracksIntegrationUtil.class.getName()); - - private LoggerHolder() { - } - } - public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098; public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099; - + public static final String DEFAULT_CONF_FILE = joinPath("asterixdb", "asterix-app", "src", "test", "resources", + "cc.conf"); + private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir"); + private static String storagePath = DEFAULT_STORAGE_PATH; public ClusterControllerService cc; - public NodeControllerService[] ncs = new NodeControllerService[0]; + public NodeControllerService[] ncs = new NodeControllerService[2]; public IHyracksClientConnection hcc; protected boolean gracefulShutdown = true; - - private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir"); - private static String storagePath = DEFAULT_STORAGE_PATH; + List<Pair<IOption, Object>> opts = new ArrayList<>(); private ConfigManager configManager; private List<String> nodeNames; - public void init(boolean deleteOldInstanceData) throws Exception { + public static void setStoragePath(String path) { + storagePath = path; + } + + public static void restoreDefaultStoragePath() { + storagePath = DEFAULT_STORAGE_PATH; + } + + /** + * main method to run a simple 2 node cluster in-process + * suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code> + * + * @param args + * unused + */ + public static void main(String[] args) throws Exception { + AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + try { + integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"), + System.getProperty("external.lib", ""), System.getProperty("conf.path", DEFAULT_CONF_FILE)); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Unexpected exception", e); + System.exit(1); + } + } + + public void init(boolean deleteOldInstanceData, String confFile) throws Exception { //NOSONAR final ICCApplication ccApplication = createCCApplication(); - configManager = new ConfigManager(); + if (confFile == null) { + configManager = new ConfigManager(); + } else { + configManager = new ConfigManager(new String[] { "-config-file", confFile }); + } ccApplication.registerConfig(configManager); final CCConfig ccConfig = createCCConfig(configManager); cc = new ClusterControllerService(ccConfig, ccApplication); @@ -90,12 +117,18 @@ public class AsterixHyracksIntegrationUtil { // mark this NC as virtual in the CC's config manager, so he doesn't try to contact NCService... configManager.set(nodeId, NCConfig.Option.NCSERVICE_PORT, NCConfig.NCSERVICE_PORT_DISABLED); final INCApplication ncApplication = createNCApplication(); - ConfigManager ncConfigManager = new ConfigManager(); + ConfigManager ncConfigManager; + if (confFile == null) { + ncConfigManager = new ConfigManager(); + } else { + ncConfigManager = new ConfigManager(new String[] { "-config-file", confFile }); + } ncApplication.registerConfig(ncConfigManager); nodeControllers.add( new NodeControllerService(fixupIODevices(createNCConfig(nodeId, ncConfigManager)), ncApplication)); - } + } ; + opts.stream().forEach(opt -> configManager.set(opt.getLeft(), opt.getRight())); cc.start(); // Starts ncs. @@ -125,11 +158,11 @@ public class AsterixHyracksIntegrationUtil { this.ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]); } - public void init(boolean deleteOldInstanceData, String externalLibPath) throws Exception { + public void init(boolean deleteOldInstanceData, String externalLibPath, String confDir) throws Exception { List<ILibraryManager> libraryManagers = new ArrayList<>(); ExternalUDFLibrarian librarian = new ExternalUDFLibrarian(libraryManagers); librarian.cleanup(); - init(deleteOldInstanceData); + init(deleteOldInstanceData, confDir); if (externalLibPath != null && externalLibPath.length() != 0) { libraryManagers.add(((ICcApplicationContext) cc.getApplicationContext()).getLibraryManager()); for (NodeControllerService nc : ncs) { @@ -246,18 +279,10 @@ public class AsterixHyracksIntegrationUtil { } } - public static void setStoragePath(String path) { - storagePath = path; - } - public void setGracefulShutdown(boolean gracefulShutdown) { this.gracefulShutdown = gracefulShutdown; } - public static void restoreDefaultStoragePath() { - storagePath = DEFAULT_STORAGE_PATH; - } - protected String getDefaultStoragePath() { return storagePath; } @@ -280,21 +305,22 @@ public class AsterixHyracksIntegrationUtil { } } - /** - * main method to run a simple 2 node cluster in-process - * suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code> - * - * @param args - * unused - */ - public static void main(String[] args) throws Exception { - AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); - try { - integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"), - System.getProperty("external.lib", "")); - } catch (Exception e) { - LOGGER.log(Level.WARNING, "Unexpected exception", e); - System.exit(1); + protected void run(boolean cleanupOnStart, boolean cleanupOnShutdown, String loadExternalLibs, String confFile) + throws Exception { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + deinit(cleanupOnShutdown); + } catch (Exception e) { + LOGGER.log(Level.WARNING, "Unexpected exception on shutdown", e); + } + } + }); + + init(cleanupOnStart, loadExternalLibs, confFile); + while (true) { + Thread.sleep(10000); } } @@ -309,7 +335,6 @@ public class AsterixHyracksIntegrationUtil { } } }); - System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, "asterix-build-configuration.xml"); init(cleanupOnStart, loadExternalLibs); while (true) { @@ -317,6 +342,17 @@ public class AsterixHyracksIntegrationUtil { } } + public void addOption(IOption name, Object value) { + opts.add(Pair.of(name, value)); + } + + static class LoggerHolder { + static final Logger LOGGER = Logger.getLogger(AsterixHyracksIntegrationUtil.class.getName()); + + private LoggerHolder() { + } + } + private class UngracefulShutdownNCApplication extends NCApplication { @Override public void stop() throws Exception { @@ -324,4 +360,5 @@ public class AsterixHyracksIntegrationUtil { webManager.stop(); } } + }
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java index b1d2159..75207e8 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/external/ExternalUDFLibrarian.java @@ -19,18 +19,25 @@ package org.apache.asterix.app.external; import java.io.File; +import java.io.FileOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.rmi.RemoteException; +import java.util.Enumeration; import java.util.HashMap; import java.util.List; +import java.util.zip.ZipEntry; +import java.util.zip.ZipFile; import org.apache.asterix.common.exceptions.ACIDException; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.event.service.AsterixEventServiceUtil; +import org.apache.commons.compress.utils.IOUtils; import org.apache.commons.io.FileUtils; import org.apache.hyracks.algebricks.common.utils.Pair; +@SuppressWarnings("squid:S134") public class ExternalUDFLibrarian implements IExternalUDFLibrarian { // The following list includes a library manager for the CC @@ -41,6 +48,38 @@ public class ExternalUDFLibrarian implements IExternalUDFLibrarian { this.libraryManagers = libraryManagers; } + public static void removeLibraryDir() throws IOException { + File installLibDir = ExternalLibraryUtils.getLibraryInstallDir(); + FileUtils.deleteQuietly(installLibDir); + } + + public static void unzip(String sourceFile, String outputDir) throws IOException { + if (System.getProperty("os.name").toLowerCase().startsWith("win")) { + try (ZipFile zipFile = new ZipFile(sourceFile)) { + Enumeration<? extends ZipEntry> entries = zipFile.entries(); + while (entries.hasMoreElements()) { + ZipEntry entry = entries.nextElement(); + File entryDestination = new File(outputDir, entry.getName()); + if (!entry.isDirectory()) { + entryDestination.getParentFile().mkdirs(); + try (InputStream in = zipFile.getInputStream(entry); + OutputStream out = new FileOutputStream(entryDestination)) { + IOUtils.copy(in, out); + } + } + } + } + } else { + Process process = new ProcessBuilder("unzip", "-d", outputDir, sourceFile).start(); + try { + process.waitFor(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new IOException(e); + } + } + } + @Override public void install(String dvName, String libName, String libPath) throws Exception { // get the directory of the to be installed libraries @@ -55,7 +94,7 @@ public class ExternalUDFLibrarian implements IExternalUDFLibrarian { FileUtils.deleteQuietly(destinationDir); destinationDir.mkdirs(); try { - AsterixEventServiceUtil.unzip(libPath, destinationDir.getAbsolutePath()); + unzip(libPath, destinationDir.getAbsolutePath()); } catch (Exception e) { throw new Exception("Couldn't unzip the file: " + libPath, e); @@ -78,11 +117,6 @@ public class ExternalUDFLibrarian implements IExternalUDFLibrarian { } } - public static void removeLibraryDir() throws IOException { - File installLibDir = ExternalLibraryUtils.getLibraryInstallDir(); - FileUtils.deleteQuietly(installLibDir); - } - public void cleanup() throws AsterixException, RemoteException, ACIDException { for (ILibraryManager libraryManager : libraryManagers) { List<Pair<String, String>> libs = libraryManager.getAllLibraries(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java index 81b232a..df3ca64 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/NCAppRuntimeContext.java @@ -41,7 +41,6 @@ import org.apache.asterix.common.config.ActiveProperties; import org.apache.asterix.common.config.AsterixExtension; import org.apache.asterix.common.config.BuildProperties; import org.apache.asterix.common.config.CompilerProperties; -import org.apache.asterix.common.config.ExtensionProperties; import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.config.MessagingProperties; import org.apache.asterix.common.config.MetadataProperties; @@ -60,6 +59,7 @@ import org.apache.asterix.common.replication.IRemoteRecoveryManager; import org.apache.asterix.common.replication.IReplicaResourcesManager; import org.apache.asterix.common.replication.IReplicationChannel; import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.replication.Replica; import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider; import org.apache.asterix.common.storage.IReplicaManager; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; @@ -165,7 +165,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { if (extensions != null) { allExtensions.addAll(extensions); } - allExtensions.addAll(new ExtensionProperties(propertiesAccessor).getExtensions()); + allExtensions.addAll(propertiesAccessor.getExtensions()); ncExtensionManager = new NCExtensionManager(allExtensions); componentProvider = new StorageComponentProvider(); resourceIdFactory = new GlobalResourceIdFactoryProvider(ncServiceContext).createResourceIdFactory(); @@ -189,6 +189,7 @@ public class NCAppRuntimeContext implements INcApplicationContext { ILocalResourceRepositoryFactory persistentLocalResourceRepositoryFactory = new PersistentLocalResourceRepositoryFactory(ioManager, getServiceContext().getNodeId(), metadataProperties, indexCheckpointManagerProvider); + localResourceRepository = (PersistentLocalResourceRepository) persistentLocalResourceRepositoryFactory.createRepository(); @@ -219,43 +220,48 @@ public class NCAppRuntimeContext implements INcApplicationContext { activeProperties.getMemoryComponentGlobalBudget(), compilerProperties.getFrameSize(), this.ncServiceContext); - if (replicationProperties.isParticipant(getServiceContext().getNodeId())) { + if (replicationProperties.isReplicationEnabled()) { replicaResourcesManager = new ReplicaResourcesManager(localResourceRepository, metadataProperties, indexCheckpointManagerProvider); replicationManager = new ReplicationManager(nodeId, replicationProperties, replicaResourcesManager, - txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider); - - //pass replication manager to replication required object - //LogManager to replicate logs - txnSubsystem.getLogManager().setReplicationManager(replicationManager); - - //PersistentLocalResourceRepository to replicated metadata files and delete backups on drop index - localResourceRepository.setReplicationManager(replicationManager); - - /* - * add the partitions that will be replicated in this node as inactive partitions - */ - //get nodes which replicated to this node - Set<String> remotePrimaryReplicas = replicationProperties.getRemotePrimaryReplicasIds(nodeId); - for (String clientId : remotePrimaryReplicas) { - //get the partitions of each client - ClusterPartition[] clientPartitions = metadataProperties.getNodePartitions().get(clientId); - for (ClusterPartition partition : clientPartitions) { - localResourceRepository.addInactivePartition(partition.getPartitionId()); + txnSubsystem.getLogManager(), asterixAppRuntimeContextProvider, ncServiceContext); + + if (replicationManager.getReplicationStrategy().isParticipant(getServiceContext().getNodeId())) { + + //pass replication manager to replication required object + //LogManager to replicate logs + txnSubsystem.getLogManager().setReplicationManager(replicationManager); + + //PersistentLocalResourceRepository to replicate metadata files and delete backups on drop index + localResourceRepository.setReplicationManager(replicationManager); + + /* + * add the partitions that will be replicated in this node as inactive partitions + */ + //get nodes which replicate to this node + Set<String> remotePrimaryReplicas = replicationManager.getReplicationStrategy() + .getRemotePrimaryReplicas(nodeId).stream().map(Replica::getId).collect(Collectors.toSet()); + for (String clientId : remotePrimaryReplicas) { + //get the partitions of each client + ClusterPartition[] clientPartitions = metadataProperties.getNodePartitions().get(clientId); + for (ClusterPartition partition : clientPartitions) { + localResourceRepository.addInactivePartition(partition.getPartitionId()); + } } - } - //initialize replication channel - replicationChannel = new ReplicationChannel(nodeId, replicationProperties, txnSubsystem.getLogManager(), - replicaResourcesManager, replicationManager, getServiceContext(), asterixAppRuntimeContextProvider); + //initialize replication channel + replicationChannel = new ReplicationChannel(nodeId, replicationProperties, txnSubsystem.getLogManager(), + replicaResourcesManager, replicationManager, getServiceContext(), + asterixAppRuntimeContextProvider, replicationManager.getReplicationStrategy()); - remoteRecoveryManager = new RemoteRecoveryManager(replicationManager, this, replicationProperties); + remoteRecoveryManager = new RemoteRecoveryManager(replicationManager, this, replicationProperties); - bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(), - storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory(), - replicationManager); + bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(), + storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory(), + replicationManager); + } } else { bufferCache = new BufferCache(ioManager, prs, pcp, new FileMapManager(), storageProperties.getBufferCacheMaxOpenFiles(), getServiceContext().getThreadFactory()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java index 22fa459..dc22342 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/RecoveryManager.java @@ -101,9 +101,9 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { this.serviceCtx = serviceCtx; this.txnSubsystem = txnSubsystem; logMgr = (LogManager) txnSubsystem.getLogManager(); - ReplicationProperties repProperties = - txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationProperties(); - replicationEnabled = repProperties.isParticipant(txnSubsystem.getId()); + ReplicationProperties repProperties = txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext() + .getReplicationProperties(); + replicationEnabled = repProperties.isReplicationEnabled(); localResourceRepository = (PersistentLocalResourceRepository) txnSubsystem.getAsterixAppRuntimeContextProvider() .getLocalResourceRepository(); cachedEntityCommitsPerJobSize = txnSubsystem.getTransactionProperties().getJobRecoveryMemorySize(); @@ -426,16 +426,16 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { @Override public long getLocalMinFirstLSN() throws HyracksDataException { - IDatasetLifecycleManager datasetLifecycleManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); List<IIndex> openIndexList = datasetLifecycleManager.getOpenResources(); long firstLSN; //the min first lsn can only be the current append or smaller long minFirstLSN = logMgr.getAppendLSN(); if (!openIndexList.isEmpty()) { for (IIndex index : openIndexList) { - AbstractLSMIOOperationCallback ioCallback = - (AbstractLSMIOOperationCallback) ((ILSMIndex) index).getIOOperationCallback(); + AbstractLSMIOOperationCallback ioCallback = (AbstractLSMIOOperationCallback) ((ILSMIndex) index) + .getIOOperationCallback(); if (!((AbstractLSMIndex) index).isCurrentMutableComponentEmpty() || ioCallback.hasPendingFlush()) { firstLSN = ioCallback.getFirstLSN(); minFirstLSN = Math.min(minFirstLSN, firstLSN); @@ -446,8 +446,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { } private long getRemoteMinFirstLSN() throws HyracksDataException { - IReplicaResourcesManager remoteResourcesManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager(); + IReplicaResourcesManager remoteResourcesManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getAppContext().getReplicaResourcesManager(); return remoteResourcesManager.getPartitionsMinLSN(localResourceRepository.getInactivePartitions()); } @@ -566,9 +566,9 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { if (activePartitions.contains(logRecord.getResourcePartition())) { undoLSNSet = jobLoserEntity2LSNsMap.get(tempKeyTxnEntityId); if (undoLSNSet == null) { - loserEntity = - new TxnEntityId(logTxnId, logRecord.getDatasetId(), logRecord.getPKHashValue(), - logRecord.getPKValue(), logRecord.getPKValueSize(), true); + loserEntity = new TxnEntityId(logTxnId, logRecord.getDatasetId(), + logRecord.getPKHashValue(), logRecord.getPKValue(), logRecord.getPKValueSize(), + true); undoLSNSet = new LinkedList<>(); jobLoserEntity2LSNsMap.put(loserEntity, undoLSNSet); } @@ -611,8 +611,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { //undo loserTxn's effect LOGGER.log(Level.INFO, "undoing loser transaction's effect"); - IDatasetLifecycleManager datasetLifecycleManager = - txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager(); + IDatasetLifecycleManager datasetLifecycleManager = txnSubsystem.getAsterixAppRuntimeContextProvider() + .getDatasetLifecycleManager(); //TODO sort loser entities by smallest LSN to undo in one pass. Iterator<Entry<TxnEntityId, List<Long>>> iter = jobLoserEntity2LSNsMap.entrySet().iterator(); int undoCount = 0; @@ -664,8 +664,8 @@ public class RecoveryManager implements IRecoveryManager, ILifeCycleComponent { private static void undo(ILogRecord logRecord, IDatasetLifecycleManager datasetLifecycleManager) { try { - ILSMIndex index = - (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), logRecord.getResourceId()); + ILSMIndex index = (ILSMIndex) datasetLifecycleManager.getIndex(logRecord.getDatasetId(), + logRecord.getResourceId()); ILSMIndexAccessor indexAccessor = index.createAccessor(NoOpIndexAccessParameters.INSTANCE); if (logRecord.getNewOp() == AbstractIndexModificationOperationCallback.INSERT_BYTE) { indexAccessor.forceDelete(logRecord.getNewValue()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java index 47f9315..41a0c3b 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/TransactionSubsystem.java @@ -25,7 +25,6 @@ import java.util.logging.Logger; import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.config.TransactionProperties; import org.apache.asterix.common.exceptions.ACIDException; -import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.transactions.Checkpoint; import org.apache.asterix.common.transactions.CheckpointProperties; import org.apache.asterix.common.transactions.IAppRuntimeContextProvider; @@ -72,8 +71,8 @@ public class TransactionSubsystem implements ITransactionSubsystem { this.lockManager = new ConcurrentLockManager(txnProperties.getLockManagerShrinkTimer()); ReplicationProperties repProperties = asterixAppRuntimeContextProvider.getAppContext().getReplicationProperties(); - IReplicationStrategy replicationStrategy = repProperties.getReplicationStrategy(); - final boolean replicationEnabled = repProperties.isParticipant(id); + final boolean replicationEnabled = repProperties.isReplicationEnabled(); + final CheckpointProperties checkpointProperties = new CheckpointProperties(txnProperties, id); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.log(Level.INFO, "Checkpoint Properties: " + checkpointProperties); @@ -84,11 +83,7 @@ public class TransactionSubsystem implements ITransactionSubsystem { transactionManager.ensureMaxTxnId(latestCheckpoint.getMaxTxnId()); } - if (replicationEnabled) { - this.logManager = new LogManagerWithReplication(this, replicationStrategy); - } else { - this.logManager = new LogManager(this); - } + this.logManager = replicationEnabled ? new LogManagerWithReplication(this) : new LogManager(this); this.recoveryManager = new RecoveryManager(this, serviceCtx); if (this.txnProperties.isCommitProfilerEnabled()) { ecp = new EntityCommitProfiler(this, this.txnProperties.getCommitProfilerReportInterval()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java index e4a6f4b..c00b1b6 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/AutoFaultToleranceStrategy.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.logging.Level; import java.util.logging.Logger; +import java.util.stream.Collectors; import org.apache.asterix.app.nc.task.BindMetadataNodeTask; import org.apache.asterix.app.nc.task.CheckpointTask; @@ -61,9 +62,11 @@ import org.apache.asterix.common.messaging.api.ICCMessageBroker; import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.replication.INCLifecycleMessage; import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.common.replication.Replica; import org.apache.asterix.common.transactions.IRecoveryManager.SystemState; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.util.FaultToleranceUtil; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hyracks.api.application.ICCServiceContext; import org.apache.hyracks.api.application.IClusterLifecycleListener.ClusterEventType; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -84,9 +87,11 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { private IReplicationStrategy replicationStrategy; private ICCServiceContext serviceCtx; private Set<String> pendingStartupCompletionNodes = new HashSet<>(); + private List<String> nodeIds; + private Map<String, SystemState> startupQueue = new HashMap<>(); @Override - public void notifyNodeJoin(String nodeId) throws HyracksDataException { + public void notifyNodeJoin(String nodeId) { pendingStartupCompletionNodes.add(nodeId); } @@ -135,14 +140,13 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { private synchronized void requestPartitionsTakeover(String failedNodeId) { //replica -> list of partitions to takeover Map<String, List<Integer>> partitionRecoveryPlan = new HashMap<>(); - ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); - ReplicationProperties replicationProperties = appCtx.getReplicationProperties(); //collect the partitions of the failed NC List<ClusterPartition> lostPartitions = getNodeAssignedPartitions(failedNodeId); if (!lostPartitions.isEmpty()) { for (ClusterPartition partition : lostPartitions) { //find replicas for this partitions - Set<String> partitionReplicas = replicationProperties.getNodeReplicasIds(partition.getNodeId()); + Set<String> partitionReplicas = replicationStrategy.getRemoteReplicas(partition.getNodeId()).stream() + .map(Replica::getId).collect(Collectors.toSet()); //find a replica that is still active for (String replica : partitionReplicas) { //TODO (mhubail) currently this assigns the partition to the first found active replica. @@ -203,9 +207,8 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { planId2FailbackPlanMap.put(plan.getPlanId(), plan); //get all partitions this node requires to resync - ICcApplicationContext appCtx = (ICcApplicationContext) serviceCtx.getApplicationContext(); - ReplicationProperties replicationProperties = appCtx.getReplicationProperties(); - Set<String> nodeReplicas = replicationProperties.getNodeReplicasIds(failingBackNodeId); + Set<String> nodeReplicas = replicationStrategy.getRemoteReplicas(failingBackNodeId).stream().map(Replica::getId) + .collect(Collectors.toSet()); clusterManager.getClusterPartitons(); for (String replicaId : nodeReplicas) { ClusterPartition[] nodePartitions = clusterManager.getNodePartitions(replicaId); @@ -425,6 +428,7 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { ft.messageBroker = (ICCMessageBroker) serviceCtx.getMessageBroker(); ft.replicationStrategy = replicationStrategy; ft.serviceCtx = serviceCtx; + ft.nodeIds = serviceCtx.getAppConfig().getNCNames(); return ft; } @@ -486,18 +490,29 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { private synchronized void process(RegistrationTasksRequestMessage msg) throws HyracksDataException { final String nodeId = msg.getNodeId(); final SystemState state = msg.getState(); - List<INCLifecycleTask> tasks; - if (state == SystemState.BOOTSTRAPPING || state == SystemState.HEALTHY) { - tasks = buildStartupSequence(nodeId); + //last node needed to start + if (startupQueue.keySet().size() == nodeIds.size() - 1) { + startupQueue.put(nodeId, state); + for (Map.Entry<String, SystemState> nodeState : startupQueue.entrySet()) { + List<INCLifecycleTask> tasks = buildStartupSequence(nodeState.getKey()); + RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeState.getKey(), + tasks); + try { + messageBroker.sendApplicationMessageToNC(response, nodeState.getKey()); + } catch (Exception e) { + throw HyracksDataException.create(e); + } + } + } else if (!failedNodes.isEmpty()) { + List<INCLifecycleTask> tasks = buildFailbackStartupSequence(); + RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); + try { + messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); + } catch (Exception e) { + throw HyracksDataException.create(e); + } } else { - // failed node returned. Need to start failback process - tasks = buildFailbackStartupSequence(); - } - RegistrationTasksResponseMessage response = new RegistrationTasksResponseMessage(nodeId, tasks); - try { - messageBroker.sendApplicationMessageToNC(response, msg.getNodeId()); - } catch (Exception e) { - throw HyracksDataException.create(e); + startupQueue.put(nodeId, state); } } @@ -525,4 +540,4 @@ public class AutoFaultToleranceStrategy implements IFaultToleranceStrategy { } return tasks; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java index 4e8ecd9..fe24fca 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/FaultToleranceStrategyFactory.java @@ -18,47 +18,31 @@ */ package org.apache.asterix.app.replication; -import java.util.HashMap; -import java.util.Map; - +import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.replication.IReplicationStrategy; -import org.apache.asterix.event.schema.cluster.Cluster; import org.apache.hyracks.api.application.ICCServiceContext; public class FaultToleranceStrategyFactory { - private static final Map<String, Class<? extends IFaultToleranceStrategy>> BUILT_IN_FAULT_TOLERANCE_STRATEGY = - new HashMap<>(); - - static { - BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("no_fault_tolerance", NoFaultToleranceStrategy.class); - BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("metadata_node", MetadataNodeFaultToleranceStrategy.class); - BUILT_IN_FAULT_TOLERANCE_STRATEGY.put("auto", AutoFaultToleranceStrategy.class); - } - private FaultToleranceStrategyFactory() { throw new AssertionError(); } - public static IFaultToleranceStrategy create(Cluster cluster, IReplicationStrategy repStrategy, - ICCServiceContext serviceCtx) { - boolean highAvailabilityEnabled = - cluster.getHighAvailability() != null && cluster.getHighAvailability().getEnabled() != null - && Boolean.valueOf(cluster.getHighAvailability().getEnabled()); - - if (!highAvailabilityEnabled || cluster.getHighAvailability().getFaultTolerance() == null - || cluster.getHighAvailability().getFaultTolerance().getStrategy() == null) { - return new NoFaultToleranceStrategy().from(serviceCtx, repStrategy); - } - String strategyName = cluster.getHighAvailability().getFaultTolerance().getStrategy().toLowerCase(); - if (!BUILT_IN_FAULT_TOLERANCE_STRATEGY.containsKey(strategyName)) { - throw new IllegalArgumentException(String.format("Unsupported Replication Strategy. Available types: %s", - BUILT_IN_FAULT_TOLERANCE_STRATEGY.keySet().toString())); + public static final String STRATEGY_NAME = "metadata_only"; + + public static IFaultToleranceStrategy create(ICCServiceContext serviceCtx, ReplicationProperties repProp, + IReplicationStrategy strategy) { + Class<? extends IFaultToleranceStrategy> clazz; + if (!repProp.isReplicationEnabled()) { + clazz = NoFaultToleranceStrategy.class; + } else if (STRATEGY_NAME.equals(repProp.getReplicationStrategy())) { + clazz = MetadataNodeFaultToleranceStrategy.class; + } else { + clazz = AutoFaultToleranceStrategy.class; } - Class<? extends IFaultToleranceStrategy> clazz = BUILT_IN_FAULT_TOLERANCE_STRATEGY.get(strategyName); try { - return clazz.newInstance().from(serviceCtx, repStrategy); + return clazz.newInstance().from(serviceCtx, strategy); } catch (InstantiationException | IllegalAccessException e) { throw new IllegalStateException(e); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java index 3470d7b..0080bbd 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/replication/MetadataNodeFaultToleranceStrategy.java @@ -234,6 +234,9 @@ public class MetadataNodeFaultToleranceStrategy implements IFaultToleranceStrate final List<INCLifecycleTask> tasks = new ArrayList<>(); switch (state) { case PERMANENT_DATA_LOSS: + if (failedNodes.isEmpty()) { //bootstrap + break; + } // If the metadata node (or replica) failed and lost its data // => Metadata Remote Recovery from standby replica tasks.add(getMetadataPartitionRecoveryPlan()); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java index 6976507..5609605 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java @@ -59,7 +59,6 @@ import org.apache.asterix.app.result.ResultHandle; import org.apache.asterix.app.result.ResultReader; import org.apache.asterix.common.api.IMetadataLockManager; import org.apache.asterix.common.cluster.IClusterStateManager; -import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.DatasetConfig.DatasetType; import org.apache.asterix.common.config.DatasetConfig.ExternalFilePendingOp; import org.apache.asterix.common.config.DatasetConfig.IndexType; @@ -496,8 +495,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new CompilationException("Unknown compaction policy: " + compactionPolicy); } String compactionPolicyFactoryClassName = compactionPolicyEntity.getClassName(); - ILSMMergePolicyFactory mergePolicyFactory = - (ILSMMergePolicyFactory) Class.forName(compactionPolicyFactoryClassName).newInstance(); + ILSMMergePolicyFactory mergePolicyFactory = (ILSMMergePolicyFactory) Class + .forName(compactionPolicyFactoryClassName).newInstance(); if (isExternalDataset && mergePolicyFactory.getName().compareTo("correlated-prefix") == 0) { throw new CompilationException("The correlated-prefix merge policy cannot be used with external dataset."); } @@ -584,10 +583,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } ARecordType metaRecType = (ARecordType) metaItemType; - List<List<String>> partitioningExprs = - ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getPartitioningExprs(); - List<Integer> keySourceIndicators = - ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).getKeySourceIndicators(); + List<List<String>> partitioningExprs = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()) + .getPartitioningExprs(); + List<Integer> keySourceIndicators = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()) + .getKeySourceIndicators(); boolean autogenerated = ((InternalDetailsDecl) dd.getDatasetDetailsDecl()).isAutogenerated(); ARecordType aRecordType = (ARecordType) itemType; List<IAType> partitioningTypes = ValidateUtil.validatePartitioningExpressions(aRecordType, @@ -612,8 +611,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String adapter = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getAdapter(); Map<String, String> properties = ((ExternalDetailsDecl) dd.getDatasetDetailsDecl()).getProperties(); - datasetDetails = - new ExternalDatasetDetails(adapter, properties, new Date(), TransactionState.COMMIT); + datasetDetails = new ExternalDatasetDetails(adapter, properties, new Date(), + TransactionState.COMMIT); break; default: throw new CompilationException("Unknown datatype " + dd.getDatasetType()); @@ -704,8 +703,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen protected static void validateIfResourceIsActiveInFeed(ICcApplicationContext appCtx, Dataset dataset) throws CompilationException { StringBuilder builder = null; - ActiveNotificationHandler activeEventHandler = - (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx + .getActiveNotificationHandler(); IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners(); for (IActiveEntityEventsListener listener : listeners) { if (listener.isEntityUsingDataset(dataset) && listener.isActive()) { @@ -808,8 +807,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen for (Pair<List<String>, IndexedTypeExpression> fieldExpr : stmtCreateIndex.getFieldExprs()) { IAType fieldType = null; - ARecordType subType = - KeyFieldTypeUtil.chooseSource(keySourceIndicators, keyIndex, aRecordType, metaRecordType); + ARecordType subType = KeyFieldTypeUtil.chooseSource(keySourceIndicators, keyIndex, aRecordType, + metaRecordType); boolean isOpen = subType.isOpen(); int i = 0; if (fieldExpr.first.size() > 1 && !isOpen) { @@ -845,8 +844,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen if (stmtCreateIndex.hasMetaField()) { throw new AlgebricksException("Typed open index can only be created on the record part"); } - Map<TypeSignature, IAType> typeMap = - TypeTranslator.computeTypes(mdTxnCtx, fieldExpr.second.getType(), indexName, dataverseName); + Map<TypeSignature, IAType> typeMap = TypeTranslator.computeTypes(mdTxnCtx, + fieldExpr.second.getType(), indexName, dataverseName); TypeSignature typeSignature = new TypeSignature(dataverseName, indexName); fieldType = typeMap.get(typeSignature); overridesFieldTypes = true; @@ -1061,8 +1060,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); bActiveTxn = true; try { - JobSpecification jobSpec = - ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, ds); + JobSpecification jobSpec = ExternalIndexingOperations.buildDropFilesIndexJobSpec(metadataProvider, + ds); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; runJob(hcc, jobSpec, jobFlags); @@ -1218,8 +1217,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } } // # check whether any function in current dataverse is being used by others - List<Function> functionsInDataverse = - MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, dataverseName); + List<Function> functionsInDataverse = MetadataManager.INSTANCE.getDataverseFunctions(mdTxnCtx, + dataverseName); for (Function function : functionsInDataverse) { if (checkWhetherFunctionIsBeingUsed(mdTxnCtx, function.getDataverseName(), function.getName(), function.getArity(), dataverseName)) { @@ -1230,8 +1229,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; // # disconnect all feeds from any datasets in the dataverse. - ActiveNotificationHandler activeEventHandler = - (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx + .getActiveNotificationHandler(); IActiveEntityEventsListener[] activeListeners = activeEventHandler.getEventListeners(); for (IActiveEntityEventsListener listener : activeListeners) { EntityId activeEntityId = listener.getEntityId(); @@ -1259,15 +1258,15 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String datasetName = dataset.getDatasetName(); DatasetType dsType = dataset.getDatasetType(); if (dsType == DatasetType.INTERNAL) { - List<Index> indexes = - MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); + List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, + datasetName); for (Index index : indexes) { jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, dataset)); } } else { // External dataset - List<Index> indexes = - MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); + List<Index> indexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, + datasetName); for (int k = 0; k < indexes.size(); k++) { if (ExternalIndexingOperations.isFileIndex(indexes.get(k))) { jobsToExecute.add( @@ -1375,8 +1374,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen public static void doDropDataset(String dataverseName, String datasetName, MetadataProvider metadataProvider, boolean ifExists, IHyracksClientConnection hcc, boolean dropCorrespondingNodeGroup) throws Exception { MutableObject<ProgressState> progress = new MutableObject<>(ProgressState.NO_PROGRESS); - MutableObject<MetadataTransactionContext> mdTxnCtx = - new MutableObject<>(MetadataManager.INSTANCE.beginTransaction()); + MutableObject<MetadataTransactionContext> mdTxnCtx = new MutableObject<>( + MetadataManager.INSTANCE.beginTransaction()); MutableBoolean bActiveTxn = new MutableBoolean(true); metadataProvider.setMetadataTxnContext(mdTxnCtx.getValue()); List<JobSpecification> jobsToExecute = new ArrayList<>(); @@ -1451,8 +1450,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new AlgebricksException( "There is no dataset with this name " + datasetName + " in dataverse " + dataverseName); } - ActiveNotificationHandler activeEventHandler = - (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx + .getActiveNotificationHandler(); IActiveEntityEventsListener[] listeners = activeEventHandler.getEventListeners(); StringBuilder builder = null; for (IActiveEntityEventsListener listener : listeners) { @@ -1521,8 +1520,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } // #. prepare a job to drop the index in NC. jobsToExecute.add(IndexUtil.buildDropIndexJobSpec(index, metadataProvider, ds)); - List<Index> datasetIndexes = - MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, datasetName); + List<Index> datasetIndexes = MetadataManager.INSTANCE.getDatasetIndexes(mdTxnCtx, dataverseName, + datasetName); if (datasetIndexes.size() == 2) { dropFilesIndex = true; // only one index + the files index, we need to delete both of the indexes @@ -1763,9 +1762,9 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataLockUtil.modifyDatasetBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName); try { - CompiledLoadFromFileStatement cls = - new CompiledLoadFromFileStatement(dataverseName, loadStmt.getDatasetName().getValue(), - loadStmt.getAdapter(), loadStmt.getProperties(), loadStmt.dataIsAlreadySorted()); + CompiledLoadFromFileStatement cls = new CompiledLoadFromFileStatement(dataverseName, + loadStmt.getDatasetName().getValue(), loadStmt.getAdapter(), loadStmt.getProperties(), + loadStmt.dataIsAlreadySorted()); JobSpecification spec = apiFramework.compileQuery(hcc, metadataProvider, null, 0, null, sessionOutput, cls); MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); bActiveTxn = false; @@ -1882,8 +1881,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throws RemoteException, AlgebricksException, ACIDException { // Query Rewriting (happens under the same ongoing metadata transaction) - Pair<IReturningStatement, Integer> rewrittenResult = - apiFramework.reWriteQuery(declaredFunctions, metadataProvider, query, sessionOutput); + Pair<IReturningStatement, Integer> rewrittenResult = apiFramework.reWriteQuery(declaredFunctions, + metadataProvider, query, sessionOutput); // Query Compilation (happens under the same ongoing metadata transaction) return apiFramework.compileQuery(clusterInfoCollector, metadataProvider, (Query) rewrittenResult.first, @@ -1896,8 +1895,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen // Insert/upsert statement rewriting (happens under the same ongoing metadata // transaction) - Pair<IReturningStatement, Integer> rewrittenResult = - apiFramework.reWriteQuery(declaredFunctions, metadataProvider, insertUpsert, sessionOutput); + Pair<IReturningStatement, Integer> rewrittenResult = apiFramework.reWriteQuery(declaredFunctions, + metadataProvider, insertUpsert, sessionOutput); InsertStatement rewrittenInsertUpsert = (InsertStatement) rewrittenResult.first; String dataverseName = getActiveDataverse(rewrittenInsertUpsert.getDataverseName()); @@ -1969,8 +1968,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen try { mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); - FeedPolicyEntity feedPolicy = - MetadataManager.INSTANCE.getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy); + FeedPolicyEntity feedPolicy = MetadataManager.INSTANCE + .getFeedPolicy(metadataProvider.getMetadataTxnContext(), dataverse, policy); if (feedPolicy != null) { if (cfps.getIfNotExists()) { MetadataManager.INSTANCE.commitTransaction(mdTxnCtx); @@ -2048,10 +2047,10 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throws Exception { MetadataTransactionContext mdTxnCtx = metadataProvider.getMetadataTxnContext(); EntityId feedId = feed.getFeedId(); - ActiveNotificationHandler activeNotificationHandler = - (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); - ActiveEntityEventsListener listener = - (ActiveEntityEventsListener) activeNotificationHandler.getListener(feedId); + ActiveNotificationHandler activeNotificationHandler = (ActiveNotificationHandler) appCtx + .getActiveNotificationHandler(); + ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeNotificationHandler + .getListener(feedId); if (listener != null && listener.getState() != ActivityState.STOPPED) { throw new AlgebricksException("Feed " + feedId + " is currently active and connected to the following dataset(s) \n" + listener.toString()); @@ -2120,15 +2119,15 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String fqName = feedConnection.getDataverseName() + "." + feedConnection.getDatasetName(); lockManager.acquireDatasetReadLock(metadataProvider.getLocks(), fqName); } - ActiveNotificationHandler activeEventHandler = - (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx + .getActiveNotificationHandler(); ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler.getListener(entityId); if (listener == null) { // Prepare policy List<Dataset> datasets = new ArrayList<>(); for (FeedConnection connection : feedConnections) { - Dataset ds = - metadataProvider.findDataset(connection.getDataverseName(), connection.getDatasetName()); + Dataset ds = metadataProvider.findDataset(connection.getDataverseName(), + connection.getDatasetName()); datasets.add(ds); } listener = new FeedEventsListener(this, metadataProvider.getApplicationContext(), hcc, entityId, @@ -2153,8 +2152,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String dataverseName = getActiveDataverse(sfst.getDataverseName()); String feedName = sfst.getFeedName().getValue(); EntityId entityId = new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName); - ActiveNotificationHandler activeEventHandler = - (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx + .getActiveNotificationHandler(); // Obtain runtime info from ActiveListener ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler.getListener(entityId); if (listener == null) { @@ -2180,8 +2179,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen metadataProvider.setMetadataTxnContext(mdTxnCtx); // TODO: Check whether we are connecting a change feed to a non-meta dataset // Check whether feed is alive - ActiveNotificationHandler activeEventHandler = - (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx + .getActiveNotificationHandler(); // Transaction handling MetadataLockUtil.connectFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName, dataverseName + "." + feedName); @@ -2234,8 +2233,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen MetadataLockUtil.disconnectFeedBegin(lockManager, metadataProvider.getLocks(), dataverseName, dataverseName + "." + datasetName, dataverseName + "." + cfs.getFeedName()); try { - ActiveNotificationHandler activeEventHandler = - (ActiveNotificationHandler) appCtx.getActiveNotificationHandler(); + ActiveNotificationHandler activeEventHandler = (ActiveNotificationHandler) appCtx + .getActiveNotificationHandler(); // Check whether feed is alive ActiveEntityEventsListener listener = (ActiveEntityEventsListener) activeEventHandler .getListener(new EntityId(Feed.EXTENSION_NAME, dataverseName, feedName)); @@ -2290,8 +2289,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new AlgebricksException( "Cannot compact the extrenal dataset " + datasetName + " because it has no indexes"); } - Dataverse dataverse = - MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), dataverseName); + Dataverse dataverse = MetadataManager.INSTANCE.getDataverse(metadataProvider.getMetadataTxnContext(), + dataverseName); jobsToExecute.add(DatasetUtil.compactDatasetJobSpec(dataverse, datasetName, metadataProvider)); if (ds.getDatasetType() == DatasetType.INTERNAL) { @@ -2421,8 +2420,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen } private void updateJobStats(JobId jobId, Stats stats) { - final IJobManager jobManager = - ((ClusterControllerService) appCtx.getServiceContext().getControllerService()).getJobManager(); + final IJobManager jobManager = ((ClusterControllerService) appCtx.getServiceContext().getControllerService()) + .getJobManager(); final JobRun run = jobManager.get(jobId); if (run == null || run.getStatus() != JobStatus.TERMINATED) { return; @@ -2790,8 +2789,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String dataverseNameTo = getActiveDataverse(pregelixStmt.getDataverseNameTo()); String datasetNameFrom = pregelixStmt.getDatasetNameFrom().getValue(); String datasetNameTo = pregelixStmt.getDatasetNameTo().getValue(); - String fullyQualifiedDatasetNameTo = - DatasetUtil.isFullyQualifiedName(datasetNameTo) ? datasetNameTo : dataverseNameTo + '.' + datasetNameTo; + String fullyQualifiedDatasetNameTo = DatasetUtil.isFullyQualifiedName(datasetNameTo) ? datasetNameTo + : dataverseNameTo + '.' + datasetNameTo; MetadataTransactionContext mdTxnCtx = MetadataManager.INSTANCE.beginTransaction(); metadataProvider.setMetadataTxnContext(mdTxnCtx); MetadataLockUtil.insertDeleteUpsertBegin(lockManager, metadataProvider.getLocks(), fullyQualifiedDatasetNameTo); @@ -2874,8 +2873,8 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen throw new AlgebricksException("Tried to access non-existing dataset: " + datasetNameTo); } // Cleans up the sink dataset -- Drop and then Create. - DropDatasetStatement dropStmt = - new DropDatasetStatement(new Identifier(dataverseNameTo), pregelixStmt.getDatasetNameTo(), true); + DropDatasetStatement dropStmt = new DropDatasetStatement(new Identifier(dataverseNameTo), + pregelixStmt.getDatasetNameTo(), true); this.handleDatasetDropStatement(metadataProvider, dropStmt, hcc, null); IDatasetDetailsDecl idd = new InternalDetailsDecl(toIndex.getKeyFieldNames(), toIndex.getKeyFieldSourceIndicators(), false, null); @@ -2969,7 +2968,7 @@ public class QueryTranslator extends AbstractLangTranslator implements IStatemen String fromDatasetName, String toDataverseName, String toDatasetName) { // Constructs AsterixDB parameters, e.g., URL, source dataset and sink dataset. ExternalProperties externalProperties = appCtx.getExternalProperties(); - String clientIP = ClusterProperties.INSTANCE.getCluster().getMasterNode().getClientIp(); + String clientIP = appCtx.getServiceContext().getCCContext().getClusterControllerInfo().getClientNetAddress(); StringBuilder asterixdbParameterBuilder = new StringBuilder(); asterixdbParameterBuilder.append( "pregelix.asterixdb.url=" + "http://" + clientIP + ":" + externalProperties.getAPIServerPort() + ","); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java deleted file mode 100644 index c9b3565..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/drivers/AsterixCLI.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.drivers; - -import java.io.File; -import java.io.FileReader; -import java.io.Reader; -import java.util.List; - -import org.apache.asterix.api.common.AsterixHyracksIntegrationUtil; -import org.apache.asterix.api.java.AsterixJavaClient; -import org.apache.asterix.app.translator.DefaultStatementExecutorFactory; -import org.apache.asterix.common.config.GlobalConfig; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.compiler.provider.AqlCompilationProvider; -import org.apache.asterix.compiler.provider.ILangCompilationProvider; -import org.apache.asterix.file.StorageComponentProvider; -import org.apache.commons.io.FileUtils; -import org.kohsuke.args4j.Argument; -import org.kohsuke.args4j.CmdLineParser; -import org.kohsuke.args4j.Option; - -public class AsterixCLI { - private static class Options { - @Option(name = "-properties", usage = "Name of properties file", required = true) - public String properties; - - @Option(name = "-output", usage = "Output folder to place results", required = true) - public String outputFolder; - - @Argument(usage = "AQL Files to run", multiValued = true, required = true) - public List<String> args; - } - - private static AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); - - public static void main(String args[]) throws Exception { - Options options = new Options(); - CmdLineParser parser = new CmdLineParser(options); - parser.parseArgument(args); - ILangCompilationProvider compilationProvider = new AqlCompilationProvider(); - setUp(options); - try { - for (String queryFile : options.args) { - Reader in = new FileReader(queryFile); - AsterixJavaClient ajc = - new AsterixJavaClient((ICcApplicationContext) integrationUtil.cc.getApplicationContext(), - integrationUtil.getHyracksClientConnection(), in, - compilationProvider, new DefaultStatementExecutorFactory(), new StorageComponentProvider()); - try { - ajc.compile(true, false, false, false, false, true, false); - } finally { - in.close(); - } - ajc.execute(); - } - } finally { - tearDown(); - } - System.exit(0); - } - - public static void setUp(Options options) throws Exception { - System.setProperty(GlobalConfig.CONFIG_FILE_PROPERTY, options.properties); - File outdir = new File(options.outputFolder); - outdir.mkdirs(); - - File log = new File("asterix_logs"); - if (log.exists()) { - FileUtils.deleteDirectory(log); - } - File lsn = new File("last_checkpoint_lsn"); - lsn.deleteOnExit(); - - integrationUtil.init(false); - } - - public static void tearDown() throws Exception { - integrationUtil.deinit(false); - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java index 8283257..40d8996 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/CCApplication.java @@ -26,6 +26,7 @@ import static org.apache.asterix.api.http.server.ServletConstants.HYRACKS_CONNEC import static org.apache.asterix.common.api.IClusterManagementWork.ClusterState.SHUTTING_DOWN; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.ConcurrentMap; @@ -58,17 +59,19 @@ import org.apache.asterix.app.cc.CCExtensionManager; import org.apache.asterix.app.external.ExternalLibraryUtils; import org.apache.asterix.app.replication.FaultToleranceStrategyFactory; import org.apache.asterix.common.api.AsterixThreadFactory; -import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.api.INodeJobTracker; import org.apache.asterix.common.config.AsterixExtension; -import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.config.ExternalProperties; import org.apache.asterix.common.config.MetadataProperties; +import org.apache.asterix.common.config.PropertiesAccessor; +import org.apache.asterix.common.config.ReplicationProperties; import org.apache.asterix.common.context.IStorageComponentProvider; import org.apache.asterix.common.dataflow.ICcApplicationContext; import org.apache.asterix.common.library.ILibraryManager; import org.apache.asterix.common.replication.IFaultToleranceStrategy; import org.apache.asterix.common.replication.IReplicationStrategy; +import org.apache.asterix.common.replication.ReplicationStrategyFactory; import org.apache.asterix.common.utils.Servlets; import org.apache.asterix.external.library.ExternalLibraryManager; import org.apache.asterix.file.StorageComponentProvider; @@ -76,7 +79,6 @@ import org.apache.asterix.messaging.CCMessageBroker; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.api.IAsterixStateProxy; import org.apache.asterix.metadata.bootstrap.AsterixStateProxy; -import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.metadata.lock.MetadataLockManager; import org.apache.asterix.runtime.job.resource.JobCapacityController; import org.apache.asterix.runtime.utils.CcApplicationContext; @@ -122,8 +124,8 @@ public class CCApplication extends BaseCCApplication { if (args.length > 0) { throw new IllegalArgumentException("Unrecognized argument(s): " + Arrays.toString(args)); } - final ClusterControllerService controllerService = - (ClusterControllerService) ccServiceCtx.getControllerService(); + final ClusterControllerService controllerService = (ClusterControllerService) ccServiceCtx + .getControllerService(); ccServiceCtx.setMessageBroker(new CCMessageBroker(controllerService)); configureLoggingLevel(ccServiceCtx.getAppConfig().getLoggingLevel(ExternalProperties.Option.LOG_LEVEL)); @@ -136,16 +138,19 @@ public class CCApplication extends BaseCCApplication { int port = ccServiceCtx.getCCContext().getClusterControllerInfo().getClientNetPort(); hcc = new HyracksConnection(strIP, port); ILibraryManager libraryManager = new ExternalLibraryManager(); - - IReplicationStrategy repStrategy = ClusterProperties.INSTANCE.getReplicationStrategy(); - IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory - .create(ClusterProperties.INSTANCE.getCluster(), repStrategy, ccServiceCtx); + ReplicationProperties repProp = new ReplicationProperties( + PropertiesAccessor.getInstance(ccServiceCtx.getAppConfig())); + IReplicationStrategy repStrategy = ReplicationStrategyFactory.create(repProp.getReplicationStrategy(), repProp, + getConfigManager()); + IFaultToleranceStrategy ftStrategy = FaultToleranceStrategyFactory.create(ccServiceCtx, repProp, repStrategy); ExternalLibraryUtils.setUpExternaLibraries(libraryManager, false); componentProvider = new StorageComponentProvider(); GlobalRecoveryManager globalRecoveryManager = createGlobalRecoveryManager(); statementExecutorCtx = new StatementExecutorContext(); appCtx = createApplicationContext(libraryManager, globalRecoveryManager, ftStrategy); - ccExtensionManager = new CCExtensionManager(getExtensions()); + List<AsterixExtension> extensions = new ArrayList<>(); + extensions.addAll(this.getExtensions()); + ccExtensionManager = new CCExtensionManager(extensions); appCtx.setExtensionManager(ccExtensionManager); final CCConfig ccConfig = controllerService.getCCConfig(); if (System.getProperty("java.rmi.server.hostname") == null) { @@ -162,7 +167,6 @@ public class CCApplication extends BaseCCApplication { webManager = new WebManager(); configureServers(); webManager.start(); - ClusterManagerProvider.getClusterManager().registerSubscriber(globalRecoveryManager); ccServiceCtx.addClusterLifecycleListener(new ClusterLifecycleListener(appCtx)); final INodeJobTracker nodeJobTracker = appCtx.getNodeJobTracker(); ccServiceCtx.addJobLifecycleListener(nodeJobTracker); @@ -222,8 +226,8 @@ public class CCApplication extends BaseCCApplication { } protected HttpServer setupJSONAPIServer(ExternalProperties externalProperties) throws Exception { - HttpServer jsonAPIServer = - new HttpServer(webManager.getBosses(), webManager.getWorkers(), externalProperties.getAPIServerPort()); + HttpServer jsonAPIServer = new HttpServer(webManager.getBosses(), webManager.getWorkers(), + externalProperties.getAPIServerPort()); jsonAPIServer.setAttribute(HYRACKS_CONNECTION_ATTR, hcc); jsonAPIServer.setAttribute(ASTERIX_APP_CONTEXT_INFO_ATTR, appCtx); jsonAPIServer.setAttribute(ServletConstants.EXECUTOR_SERVICE_ATTR, @@ -346,8 +350,7 @@ public class CCApplication extends BaseCCApplication { @Override public void startupCompleted() throws Exception { ccServiceCtx.getControllerService().getExecutor().submit(() -> { - appCtx.getClusterStateManager().waitForState(ClusterState.ACTIVE); - ClusterManagerProvider.getClusterManager().notifyStartupCompleted(); + appCtx.getClusterStateManager().waitForState(IClusterManagementWork.ClusterState.ACTIVE); return null; }); } http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java index 8f479eb..217d6e2 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterLifecycleListener.java @@ -28,18 +28,14 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.logging.Level; import java.util.logging.Logger; -import org.apache.asterix.common.api.IClusterEventsSubscriber; import org.apache.asterix.common.api.IClusterManagementWork; import org.apache.asterix.common.api.IClusterManagementWorkResponse; import org.apache.asterix.common.api.IClusterManagementWorkResponse.Status; import org.apache.asterix.common.cluster.IClusterStateManager; -import org.apache.asterix.common.config.ClusterProperties; import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.event.schema.cluster.Node; import org.apache.asterix.metadata.MetadataManager; import org.apache.asterix.metadata.cluster.AddNodeWork; import org.apache.asterix.metadata.cluster.AddNodeWorkResponse; -import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.metadata.cluster.RemoveNodeWork; import org.apache.asterix.metadata.cluster.RemoveNodeWorkResponse; import org.apache.hyracks.api.application.IClusterLifecycleListener; @@ -56,7 +52,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { public ClusterLifecycleListener(ICcApplicationContext appCtx) { this.appCtx = appCtx; - eventHandler = new ClusterWorkExecutor(appCtx, workRequestQueue); + eventHandler = new ClusterWorkExecutor(workRequestQueue); Thread t = new Thread(eventHandler); if (LOGGER.isLoggable(Level.INFO)) { LOGGER.info("Starting cluster event handler"); @@ -80,17 +76,6 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { Set<String> nodeAddition = new HashSet<>(); nodeAddition.add(nodeId); updateProgress(ClusterEventType.NODE_JOIN, nodeAddition); - Set<IClusterEventsSubscriber> subscribers = - ClusterManagerProvider.getClusterManager().getRegisteredClusterEventSubscribers(); - Set<IClusterManagementWork> work = new HashSet<>(); - for (IClusterEventsSubscriber sub : subscribers) { - Set<IClusterManagementWork> workRequest = sub.notifyNodeJoin(nodeId); - work.addAll(workRequest); - } - if (!work.isEmpty()) { - executeWorkSet(work); - } - } @Override @@ -108,13 +93,7 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { } } updateProgress(ClusterEventType.NODE_FAILURE, deadNodeIds); - Set<IClusterEventsSubscriber> subscribers = - ClusterManagerProvider.getClusterManager().getRegisteredClusterEventSubscribers(); Set<IClusterManagementWork> work = new HashSet<>(); - for (IClusterEventsSubscriber sub : subscribers) { - Set<IClusterManagementWork> workRequest = sub.notifyNodeFailure(deadNodeIds); - work.addAll(workRequest); - } if (!work.isEmpty()) { executeWorkSet(work); } @@ -163,38 +142,14 @@ public class ClusterLifecycleListener implements IClusterLifecycleListener { case REMOVE_NODE: nodesToRemove.addAll(((RemoveNodeWork) w).getNodesToBeRemoved()); nodeRemovalRequests.add(w); - RemoveNodeWorkResponse response = - new RemoveNodeWorkResponse((RemoveNodeWork) w, Status.IN_PROGRESS); + RemoveNodeWorkResponse response = new RemoveNodeWorkResponse((RemoveNodeWork) w, + Status.IN_PROGRESS); pendingWorkResponses.add(response); break; } } List<String> addedNodes = new ArrayList<>(); - String asterixInstanceName = ClusterProperties.INSTANCE.getCluster().getInstanceName(); - IClusterStateManager csm = appCtx.getClusterStateManager(); - for (int i = 0; i < nodesToAdd; i++) { - Node node = csm.getAvailableSubstitutionNode(); - if (node != null) { - try { - ClusterManagerProvider.getClusterManager().addNode(appCtx, node); - addedNodes.add(asterixInstanceName + "_" + node.getId()); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Added NC at:" + node.getId()); - } - } catch (Exception e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to add NC at:" + node.getId()); - } - e.printStackTrace(); - } - } else { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to add NC: no more available nodes"); - } - - } - } for (AddNodeWork w : nodeAdditionRequests) { int n = w.getNumberOfNodesRequested(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/5dcf139e/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java index 2977a58..bcc1c60 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/hyracks/bootstrap/ClusterWorkExecutor.java @@ -25,23 +25,16 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.asterix.common.api.IClusterManagementWork; -import org.apache.asterix.common.cluster.IClusterStateManager; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.event.schema.cluster.Node; import org.apache.asterix.metadata.cluster.AddNodeWork; -import org.apache.asterix.metadata.cluster.ClusterManagerProvider; import org.apache.asterix.metadata.cluster.RemoveNodeWork; public class ClusterWorkExecutor implements Runnable { private static final Logger LOGGER = Logger.getLogger(ClusterWorkExecutor.class.getName()); - private final ICcApplicationContext appCtx; private final LinkedBlockingQueue<Set<IClusterManagementWork>> inbox; - public ClusterWorkExecutor(ICcApplicationContext appCtx, LinkedBlockingQueue<Set<IClusterManagementWork>> inbox) { - this.appCtx = appCtx; + public ClusterWorkExecutor(LinkedBlockingQueue<Set<IClusterManagementWork>> inbox) { this.inbox = inbox; } @@ -69,30 +62,6 @@ public class ClusterWorkExecutor implements Runnable { } } - IClusterStateManager csm = appCtx.getClusterStateManager(); - Set<Node> addedNodes = new HashSet<>(); - for (int i = 0; i < nodesToAdd; i++) { - Node node = csm.getAvailableSubstitutionNode(); - if (node != null) { - try { - ClusterManagerProvider.getClusterManager().addNode(appCtx, node); - addedNodes.add(node); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Added NC at:" + node.getId()); - } - } catch (AsterixException e) { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to add NC at:" + node.getId()); - } - e.printStackTrace(); - } - } else { - if (LOGGER.isLoggable(Level.WARNING)) { - LOGGER.warning("Unable to add NC: no more available nodes"); - } - } - } - } catch (InterruptedException e) { if (LOGGER.isLoggable(Level.SEVERE)) { LOGGER.severe("interruped" + e.getMessage());
