Repository: asterixdb Updated Branches: refs/heads/master 621daae69 -> 3b60f6168
[NO ISSUE] Move AsterixHyracksIntegrationUtil from production to test Change-Id: Id603d0f1ac17b977356e628a89845d240c8aa8b7 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2311 Tested-by: Jenkins <[email protected]> Integration-Tests: Jenkins <[email protected]> Contrib: Jenkins <[email protected]> Reviewed-by: Till Westmann <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/asterixdb/repo Commit: http://git-wip-us.apache.org/repos/asf/asterixdb/commit/3b60f616 Tree: http://git-wip-us.apache.org/repos/asf/asterixdb/tree/3b60f616 Diff: http://git-wip-us.apache.org/repos/asf/asterixdb/diff/3b60f616 Branch: refs/heads/master Commit: 3b60f61681e8be9b145080956f4e13b37a229741 Parents: 621daae Author: Michael Blow <[email protected]> Authored: Mon Jan 22 17:37:13 2018 -0500 Committer: Michael Blow <[email protected]> Committed: Tue Jan 23 08:54:32 2018 -0800 ---------------------------------------------------------------------- .../asterix/api/common/AsterixClientConfig.java | 4 +- .../common/AsterixHyracksIntegrationUtil.java | 390 ------------------- .../common/AsterixHyracksIntegrationUtil.java | 390 +++++++++++++++++++ 3 files changed, 392 insertions(+), 392 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3b60f616/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixClientConfig.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixClientConfig.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixClientConfig.java index 9db71d1..b464781 100644 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixClientConfig.java +++ b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixClientConfig.java @@ -37,8 +37,8 @@ public class AsterixClientConfig { @Option(name = "-hyracks-job", usage = "Generates and prints the Hyracks job. It is false by default.") public String hyracksJob = "false"; - @Option(name = "-hyracks-port", usage = "The port used to connect to the Hyracks server.") - public int hyracksPort = AsterixHyracksIntegrationUtil.DEFAULT_HYRACKS_CC_CLIENT_PORT; + @Option(name = "-hyracks-port", usage = "The port used to connect to the Hyracks server. (default: 1098)") + public int hyracksPort = 1098; @Argument private List<String> arguments = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3b60f616/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java deleted file mode 100644 index 97079eb..0000000 --- a/asterixdb/asterix-app/src/main/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java +++ /dev/null @@ -1,390 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.asterix.api.common; - -import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER; -import static org.apache.hyracks.util.file.FileUtil.joinPath; - -import java.io.File; -import java.io.IOException; -import java.net.Inet4Address; -import java.nio.file.Files; -import java.nio.file.Path; -import java.nio.file.Paths; -import java.nio.file.attribute.BasicFileAttributes; -import java.util.ArrayList; -import java.util.List; -import java.util.function.BiPredicate; -import java.util.stream.Stream; - -import org.apache.asterix.app.external.ExternalUDFLibrarian; -import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; -import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.common.config.PropertiesAccessor; -import org.apache.asterix.common.dataflow.ICcApplicationContext; -import org.apache.asterix.common.exceptions.AsterixException; -import org.apache.asterix.common.library.ILibraryManager; -import org.apache.asterix.hyracks.bootstrap.CCApplication; -import org.apache.asterix.hyracks.bootstrap.NCApplication; -import org.apache.commons.io.FileUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.apache.hyracks.api.application.ICCApplication; -import org.apache.hyracks.api.application.INCApplication; -import org.apache.hyracks.api.client.HyracksConnection; -import org.apache.hyracks.api.client.IHyracksClientConnection; -import org.apache.hyracks.api.config.IOption; -import org.apache.hyracks.control.cc.ClusterControllerService; -import org.apache.hyracks.control.common.config.ConfigManager; -import org.apache.hyracks.control.common.controllers.CCConfig; -import org.apache.hyracks.control.common.controllers.ControllerConfig; -import org.apache.hyracks.control.common.controllers.NCConfig; -import org.apache.hyracks.control.nc.NodeControllerService; -import org.apache.logging.log4j.Level; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; -import org.kohsuke.args4j.CmdLineException; - -@SuppressWarnings({ "squid:ClassVariableVisibilityCheck", "squid:S00112" }) -public class AsterixHyracksIntegrationUtil { - - public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098; - public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099; - public static final String DEFAULT_CONF_FILE = joinPath(getProjectPath().toString(), "src", "test", "resources", - "cc.conf"); - private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir"); - private static String storagePath = DEFAULT_STORAGE_PATH; - - static { - System.setProperty("java.util.logging.manager", org.apache.logging.log4j.jul.LogManager.class.getName()); - } - - public ClusterControllerService cc; - public NodeControllerService[] ncs = new NodeControllerService[2]; - public IHyracksClientConnection hcc; - protected boolean gracefulShutdown = true; - List<Pair<IOption, Object>> opts = new ArrayList<>(); - private ConfigManager configManager; - private List<String> nodeNames; - - public static void setStoragePath(String path) { - storagePath = path; - } - - public static void restoreDefaultStoragePath() { - storagePath = DEFAULT_STORAGE_PATH; - } - - /** - * main method to run a simple 2 node cluster in-process - * suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code> - * - * @param args unused - */ - public static void main(String[] args) throws Exception { - AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); - try { - integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"), - System.getProperty("external.lib", ""), System.getProperty("conf.path", DEFAULT_CONF_FILE)); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Unexpected exception", e); - System.exit(1); - } - } - - public void init(boolean deleteOldInstanceData, String confFile) throws Exception { //NOSONAR - final ICCApplication ccApplication = createCCApplication(); - if (confFile == null) { - configManager = new ConfigManager(); - } else { - configManager = new ConfigManager(new String[] { "-config-file", confFile }); - } - ccApplication.registerConfig(configManager); - final CCConfig ccConfig = createCCConfig(configManager); - cc = new ClusterControllerService(ccConfig, ccApplication); - - nodeNames = ccConfig.getConfigManager().getNodeNames(); - if (deleteOldInstanceData) { - deleteTransactionLogs(); - removeTestStorageFiles(); - } - final List<NodeControllerService> nodeControllers = new ArrayList<>(); - for (String nodeId : nodeNames) { - // mark this NC as virtual in the CC's config manager, so he doesn't try to contact NCService... - configManager.set(nodeId, NCConfig.Option.NCSERVICE_PORT, NCConfig.NCSERVICE_PORT_DISABLED); - final INCApplication ncApplication = createNCApplication(); - ConfigManager ncConfigManager; - if (confFile == null) { - ncConfigManager = new ConfigManager(); - } else { - ncConfigManager = new ConfigManager(new String[] { "-config-file", confFile }); - } - ncApplication.registerConfig(ncConfigManager); - nodeControllers.add(new NodeControllerService(fixupIODevices(createNCConfig(nodeId, ncConfigManager)), - ncApplication)); - } - - opts.stream().forEach(opt -> configManager.set(opt.getLeft(), opt.getRight())); - cc.start(); - - // Starts ncs. - nodeNames = ccConfig.getConfigManager().getNodeNames(); - List<Thread> startupThreads = new ArrayList<>(); - for (NodeControllerService nc : nodeControllers) { - Thread ncStartThread = new Thread("IntegrationUtil-" + nc.getId()) { - @Override - public void run() { - try { - nc.start(); - } catch (Exception e) { - LOGGER.log(Level.ERROR, e.getMessage(), e); - } - } - }; - ncStartThread.start(); - startupThreads.add(ncStartThread); - } - //wait until all NCs complete their startup - for (Thread thread : startupThreads) { - thread.join(); - } - // Wait until cluster becomes active - ((ICcApplicationContext) cc.getApplicationContext()).getClusterStateManager().waitForState(ClusterState.ACTIVE); - hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort()); - this.ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]); - } - - public void init(boolean deleteOldInstanceData, String externalLibPath, String confDir) throws Exception { - List<ILibraryManager> libraryManagers = new ArrayList<>(); - ExternalUDFLibrarian librarian = new ExternalUDFLibrarian(libraryManagers); - librarian.cleanup(); - init(deleteOldInstanceData, confDir); - if (externalLibPath != null && externalLibPath.length() != 0) { - libraryManagers.add(((ICcApplicationContext) cc.getApplicationContext()).getLibraryManager()); - for (NodeControllerService nc : ncs) { - INcApplicationContext runtimeCtx = (INcApplicationContext) nc.getApplicationContext(); - libraryManagers.add(runtimeCtx.getLibraryManager()); - } - librarian.install(System.getProperty("external.lib.dataverse", "test"), - System.getProperty("external.lib.libname", "testlib"), externalLibPath); - } - } - - public ClusterControllerService getClusterControllerService() { - return cc; - } - - protected CCConfig createCCConfig(ConfigManager configManager) throws IOException { - CCConfig ccConfig = new CCConfig(configManager); - ccConfig.setClusterListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); - ccConfig.setClientListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); - ccConfig.setClientListenPort(DEFAULT_HYRACKS_CC_CLIENT_PORT); - ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT); - ccConfig.setResultTTL(120000L); - ccConfig.setResultSweepThreshold(1000L); - ccConfig.setEnforceFrameWriterProtocol(true); - configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb")); - return ccConfig; - } - - protected ICCApplication createCCApplication() { - return new CCApplication(); - } - - protected NCConfig createNCConfig(String ncName, ConfigManager configManager) { - NCConfig ncConfig = new NCConfig(ncName, configManager); - ncConfig.setClusterAddress("localhost"); - ncConfig.setClusterPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT); - ncConfig.setClusterListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); - ncConfig.setDataListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); - ncConfig.setResultListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); - ncConfig.setMessagingListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); - ncConfig.setResultTTL(120000L); - ncConfig.setResultSweepThreshold(1000L); - ncConfig.setVirtualNC(); - configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb", ncName)); - return ncConfig; - } - - protected INCApplication createNCApplication() { - if (!gracefulShutdown) { - return new UngracefulShutdownNCApplication(); - } - return new NCApplication(); - } - - private NCConfig fixupIODevices(NCConfig ncConfig) throws IOException, AsterixException, CmdLineException { - // we have to first process the config - ncConfig.getConfigManager().processConfig(); - - // get initial partitions from config - String[] nodeStores = ncConfig.getNodeScopedAppConfig().getStringArray(NCConfig.Option.IODEVICES); - if (nodeStores == null) { - throw new IllegalStateException("Couldn't find stores for NC: " + ncConfig.getNodeId()); - } - LOGGER.info("Using the path: " + getDefaultStoragePath()); - for (int i = 0; i < nodeStores.length; i++) { - // create IO devices based on stores - nodeStores[i] = joinPath(getDefaultStoragePath(), ncConfig.getNodeId(), nodeStores[i]); - } - ncConfig.getConfigManager().set(ncConfig.getNodeId(), NCConfig.Option.IODEVICES, nodeStores); - return ncConfig; - } - - public IHyracksClientConnection getHyracksClientConnection() { - return hcc; - } - - public void deinit(boolean deleteOldInstanceData) throws Exception { - //stop NCs - ArrayList<Thread> stopNCThreads = new ArrayList<>(); - for (NodeControllerService nodeControllerService : ncs) { - if (nodeControllerService != null) { - Thread ncStopThread = new Thread() { - @Override - public void run() { - try { - nodeControllerService.stop(); - } catch (Exception e) { - e.printStackTrace(); - } - } - }; - stopNCThreads.add(ncStopThread); - ncStopThread.start(); - } - } - - //make sure all NCs stopped - for (Thread stopNcTheard : stopNCThreads) { - stopNcTheard.join(); - } - - stopCC(false); - - if (deleteOldInstanceData) { - deleteTransactionLogs(); - removeTestStorageFiles(); - } - } - - public void stopCC(boolean terminateNCService) throws Exception { - if (cc != null) { - cc.stop(terminateNCService); - cc = null; - } - } - - public void setGracefulShutdown(boolean gracefulShutdown) { - this.gracefulShutdown = gracefulShutdown; - } - - protected String getDefaultStoragePath() { - return storagePath; - } - - public void removeTestStorageFiles() { - File dir = new File(getDefaultStoragePath()); - for (String ncName : nodeNames) { - File ncDir = new File(dir, ncName); - FileUtils.deleteQuietly(ncDir); - } - } - - private void deleteTransactionLogs() throws IOException, AsterixException { - for (String ncId : nodeNames) { - File log = new File( - PropertiesAccessor.getInstance(configManager.getAppConfig()).getTransactionLogDirs().get(ncId)); - if (log.exists()) { - FileUtils.deleteDirectory(log); - } - } - } - - protected void run(boolean cleanupOnStart, boolean cleanupOnShutdown, String loadExternalLibs, String confFile) - throws Exception { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - deinit(cleanupOnShutdown); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Unexpected exception on shutdown", e); - } - } - }); - - init(cleanupOnStart, loadExternalLibs, confFile); - while (true) { - Thread.sleep(10000); - } - } - - protected void run(boolean cleanupOnStart, boolean cleanupOnShutdown, String loadExternalLibs) throws Exception { - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - try { - deinit(cleanupOnShutdown); - } catch (Exception e) { - LOGGER.log(Level.WARN, "Unexpected exception on shutdown", e); - } - } - }); - - init(cleanupOnStart, loadExternalLibs); - while (true) { - Thread.sleep(10000); - } - } - - public void addOption(IOption name, Object value) { - opts.add(Pair.of(name, value)); - } - - /** - * @return the asterix-app absolute path if found, otherwise the default user path. - */ - private static Path getProjectPath() { - final String targetDir = "asterix-app"; - final BiPredicate<Path, BasicFileAttributes> matcher = - (path, attributes) -> path.getFileName().toString().equals(targetDir) && path.toFile().isDirectory(); - final Path currentPath = Paths.get(System.getProperty("user.dir")); - try (Stream<Path> pathStream = Files.find(currentPath, 10, matcher)) { - return pathStream.findFirst().orElse(currentPath); - } catch (IOException e) { - throw new IllegalStateException(e); - } - } - - static class LoggerHolder { - static final Logger LOGGER = LogManager.getLogger(); - - private LoggerHolder() { - } - } - - private class UngracefulShutdownNCApplication extends NCApplication { - @Override - public void stop() throws Exception { - // ungraceful shutdown - webManager.stop(); - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/3b60f616/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java new file mode 100644 index 0000000..97079eb --- /dev/null +++ b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/AsterixHyracksIntegrationUtil.java @@ -0,0 +1,390 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.api.common; + +import static org.apache.asterix.api.common.AsterixHyracksIntegrationUtil.LoggerHolder.LOGGER; +import static org.apache.hyracks.util.file.FileUtil.joinPath; + +import java.io.File; +import java.io.IOException; +import java.net.Inet4Address; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.attribute.BasicFileAttributes; +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiPredicate; +import java.util.stream.Stream; + +import org.apache.asterix.app.external.ExternalUDFLibrarian; +import org.apache.asterix.common.api.IClusterManagementWork.ClusterState; +import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.config.PropertiesAccessor; +import org.apache.asterix.common.dataflow.ICcApplicationContext; +import org.apache.asterix.common.exceptions.AsterixException; +import org.apache.asterix.common.library.ILibraryManager; +import org.apache.asterix.hyracks.bootstrap.CCApplication; +import org.apache.asterix.hyracks.bootstrap.NCApplication; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hyracks.api.application.ICCApplication; +import org.apache.hyracks.api.application.INCApplication; +import org.apache.hyracks.api.client.HyracksConnection; +import org.apache.hyracks.api.client.IHyracksClientConnection; +import org.apache.hyracks.api.config.IOption; +import org.apache.hyracks.control.cc.ClusterControllerService; +import org.apache.hyracks.control.common.config.ConfigManager; +import org.apache.hyracks.control.common.controllers.CCConfig; +import org.apache.hyracks.control.common.controllers.ControllerConfig; +import org.apache.hyracks.control.common.controllers.NCConfig; +import org.apache.hyracks.control.nc.NodeControllerService; +import org.apache.logging.log4j.Level; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.kohsuke.args4j.CmdLineException; + +@SuppressWarnings({ "squid:ClassVariableVisibilityCheck", "squid:S00112" }) +public class AsterixHyracksIntegrationUtil { + + public static final int DEFAULT_HYRACKS_CC_CLIENT_PORT = 1098; + public static final int DEFAULT_HYRACKS_CC_CLUSTER_PORT = 1099; + public static final String DEFAULT_CONF_FILE = joinPath(getProjectPath().toString(), "src", "test", "resources", + "cc.conf"); + private static final String DEFAULT_STORAGE_PATH = joinPath("target", "io", "dir"); + private static String storagePath = DEFAULT_STORAGE_PATH; + + static { + System.setProperty("java.util.logging.manager", org.apache.logging.log4j.jul.LogManager.class.getName()); + } + + public ClusterControllerService cc; + public NodeControllerService[] ncs = new NodeControllerService[2]; + public IHyracksClientConnection hcc; + protected boolean gracefulShutdown = true; + List<Pair<IOption, Object>> opts = new ArrayList<>(); + private ConfigManager configManager; + private List<String> nodeNames; + + public static void setStoragePath(String path) { + storagePath = path; + } + + public static void restoreDefaultStoragePath() { + storagePath = DEFAULT_STORAGE_PATH; + } + + /** + * main method to run a simple 2 node cluster in-process + * suggested VM arguments: <code>-enableassertions -Xmx2048m -Dfile.encoding=UTF-8</code> + * + * @param args unused + */ + public static void main(String[] args) throws Exception { + AsterixHyracksIntegrationUtil integrationUtil = new AsterixHyracksIntegrationUtil(); + try { + integrationUtil.run(Boolean.getBoolean("cleanup.start"), Boolean.getBoolean("cleanup.shutdown"), + System.getProperty("external.lib", ""), System.getProperty("conf.path", DEFAULT_CONF_FILE)); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Unexpected exception", e); + System.exit(1); + } + } + + public void init(boolean deleteOldInstanceData, String confFile) throws Exception { //NOSONAR + final ICCApplication ccApplication = createCCApplication(); + if (confFile == null) { + configManager = new ConfigManager(); + } else { + configManager = new ConfigManager(new String[] { "-config-file", confFile }); + } + ccApplication.registerConfig(configManager); + final CCConfig ccConfig = createCCConfig(configManager); + cc = new ClusterControllerService(ccConfig, ccApplication); + + nodeNames = ccConfig.getConfigManager().getNodeNames(); + if (deleteOldInstanceData) { + deleteTransactionLogs(); + removeTestStorageFiles(); + } + final List<NodeControllerService> nodeControllers = new ArrayList<>(); + for (String nodeId : nodeNames) { + // mark this NC as virtual in the CC's config manager, so he doesn't try to contact NCService... + configManager.set(nodeId, NCConfig.Option.NCSERVICE_PORT, NCConfig.NCSERVICE_PORT_DISABLED); + final INCApplication ncApplication = createNCApplication(); + ConfigManager ncConfigManager; + if (confFile == null) { + ncConfigManager = new ConfigManager(); + } else { + ncConfigManager = new ConfigManager(new String[] { "-config-file", confFile }); + } + ncApplication.registerConfig(ncConfigManager); + nodeControllers.add(new NodeControllerService(fixupIODevices(createNCConfig(nodeId, ncConfigManager)), + ncApplication)); + } + + opts.stream().forEach(opt -> configManager.set(opt.getLeft(), opt.getRight())); + cc.start(); + + // Starts ncs. + nodeNames = ccConfig.getConfigManager().getNodeNames(); + List<Thread> startupThreads = new ArrayList<>(); + for (NodeControllerService nc : nodeControllers) { + Thread ncStartThread = new Thread("IntegrationUtil-" + nc.getId()) { + @Override + public void run() { + try { + nc.start(); + } catch (Exception e) { + LOGGER.log(Level.ERROR, e.getMessage(), e); + } + } + }; + ncStartThread.start(); + startupThreads.add(ncStartThread); + } + //wait until all NCs complete their startup + for (Thread thread : startupThreads) { + thread.join(); + } + // Wait until cluster becomes active + ((ICcApplicationContext) cc.getApplicationContext()).getClusterStateManager().waitForState(ClusterState.ACTIVE); + hcc = new HyracksConnection(cc.getConfig().getClientListenAddress(), cc.getConfig().getClientListenPort()); + this.ncs = nodeControllers.toArray(new NodeControllerService[nodeControllers.size()]); + } + + public void init(boolean deleteOldInstanceData, String externalLibPath, String confDir) throws Exception { + List<ILibraryManager> libraryManagers = new ArrayList<>(); + ExternalUDFLibrarian librarian = new ExternalUDFLibrarian(libraryManagers); + librarian.cleanup(); + init(deleteOldInstanceData, confDir); + if (externalLibPath != null && externalLibPath.length() != 0) { + libraryManagers.add(((ICcApplicationContext) cc.getApplicationContext()).getLibraryManager()); + for (NodeControllerService nc : ncs) { + INcApplicationContext runtimeCtx = (INcApplicationContext) nc.getApplicationContext(); + libraryManagers.add(runtimeCtx.getLibraryManager()); + } + librarian.install(System.getProperty("external.lib.dataverse", "test"), + System.getProperty("external.lib.libname", "testlib"), externalLibPath); + } + } + + public ClusterControllerService getClusterControllerService() { + return cc; + } + + protected CCConfig createCCConfig(ConfigManager configManager) throws IOException { + CCConfig ccConfig = new CCConfig(configManager); + ccConfig.setClusterListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); + ccConfig.setClientListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); + ccConfig.setClientListenPort(DEFAULT_HYRACKS_CC_CLIENT_PORT); + ccConfig.setClusterListenPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT); + ccConfig.setResultTTL(120000L); + ccConfig.setResultSweepThreshold(1000L); + ccConfig.setEnforceFrameWriterProtocol(true); + configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb")); + return ccConfig; + } + + protected ICCApplication createCCApplication() { + return new CCApplication(); + } + + protected NCConfig createNCConfig(String ncName, ConfigManager configManager) { + NCConfig ncConfig = new NCConfig(ncName, configManager); + ncConfig.setClusterAddress("localhost"); + ncConfig.setClusterPort(DEFAULT_HYRACKS_CC_CLUSTER_PORT); + ncConfig.setClusterListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); + ncConfig.setDataListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); + ncConfig.setResultListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); + ncConfig.setMessagingListenAddress(Inet4Address.getLoopbackAddress().getHostAddress()); + ncConfig.setResultTTL(120000L); + ncConfig.setResultSweepThreshold(1000L); + ncConfig.setVirtualNC(); + configManager.set(ControllerConfig.Option.DEFAULT_DIR, joinPath(getDefaultStoragePath(), "asterixdb", ncName)); + return ncConfig; + } + + protected INCApplication createNCApplication() { + if (!gracefulShutdown) { + return new UngracefulShutdownNCApplication(); + } + return new NCApplication(); + } + + private NCConfig fixupIODevices(NCConfig ncConfig) throws IOException, AsterixException, CmdLineException { + // we have to first process the config + ncConfig.getConfigManager().processConfig(); + + // get initial partitions from config + String[] nodeStores = ncConfig.getNodeScopedAppConfig().getStringArray(NCConfig.Option.IODEVICES); + if (nodeStores == null) { + throw new IllegalStateException("Couldn't find stores for NC: " + ncConfig.getNodeId()); + } + LOGGER.info("Using the path: " + getDefaultStoragePath()); + for (int i = 0; i < nodeStores.length; i++) { + // create IO devices based on stores + nodeStores[i] = joinPath(getDefaultStoragePath(), ncConfig.getNodeId(), nodeStores[i]); + } + ncConfig.getConfigManager().set(ncConfig.getNodeId(), NCConfig.Option.IODEVICES, nodeStores); + return ncConfig; + } + + public IHyracksClientConnection getHyracksClientConnection() { + return hcc; + } + + public void deinit(boolean deleteOldInstanceData) throws Exception { + //stop NCs + ArrayList<Thread> stopNCThreads = new ArrayList<>(); + for (NodeControllerService nodeControllerService : ncs) { + if (nodeControllerService != null) { + Thread ncStopThread = new Thread() { + @Override + public void run() { + try { + nodeControllerService.stop(); + } catch (Exception e) { + e.printStackTrace(); + } + } + }; + stopNCThreads.add(ncStopThread); + ncStopThread.start(); + } + } + + //make sure all NCs stopped + for (Thread stopNcTheard : stopNCThreads) { + stopNcTheard.join(); + } + + stopCC(false); + + if (deleteOldInstanceData) { + deleteTransactionLogs(); + removeTestStorageFiles(); + } + } + + public void stopCC(boolean terminateNCService) throws Exception { + if (cc != null) { + cc.stop(terminateNCService); + cc = null; + } + } + + public void setGracefulShutdown(boolean gracefulShutdown) { + this.gracefulShutdown = gracefulShutdown; + } + + protected String getDefaultStoragePath() { + return storagePath; + } + + public void removeTestStorageFiles() { + File dir = new File(getDefaultStoragePath()); + for (String ncName : nodeNames) { + File ncDir = new File(dir, ncName); + FileUtils.deleteQuietly(ncDir); + } + } + + private void deleteTransactionLogs() throws IOException, AsterixException { + for (String ncId : nodeNames) { + File log = new File( + PropertiesAccessor.getInstance(configManager.getAppConfig()).getTransactionLogDirs().get(ncId)); + if (log.exists()) { + FileUtils.deleteDirectory(log); + } + } + } + + protected void run(boolean cleanupOnStart, boolean cleanupOnShutdown, String loadExternalLibs, String confFile) + throws Exception { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + deinit(cleanupOnShutdown); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Unexpected exception on shutdown", e); + } + } + }); + + init(cleanupOnStart, loadExternalLibs, confFile); + while (true) { + Thread.sleep(10000); + } + } + + protected void run(boolean cleanupOnStart, boolean cleanupOnShutdown, String loadExternalLibs) throws Exception { + Runtime.getRuntime().addShutdownHook(new Thread() { + @Override + public void run() { + try { + deinit(cleanupOnShutdown); + } catch (Exception e) { + LOGGER.log(Level.WARN, "Unexpected exception on shutdown", e); + } + } + }); + + init(cleanupOnStart, loadExternalLibs); + while (true) { + Thread.sleep(10000); + } + } + + public void addOption(IOption name, Object value) { + opts.add(Pair.of(name, value)); + } + + /** + * @return the asterix-app absolute path if found, otherwise the default user path. + */ + private static Path getProjectPath() { + final String targetDir = "asterix-app"; + final BiPredicate<Path, BasicFileAttributes> matcher = + (path, attributes) -> path.getFileName().toString().equals(targetDir) && path.toFile().isDirectory(); + final Path currentPath = Paths.get(System.getProperty("user.dir")); + try (Stream<Path> pathStream = Files.find(currentPath, 10, matcher)) { + return pathStream.findFirst().orElse(currentPath); + } catch (IOException e) { + throw new IllegalStateException(e); + } + } + + static class LoggerHolder { + static final Logger LOGGER = LogManager.getLogger(); + + private LoggerHolder() { + } + } + + private class UngracefulShutdownNCApplication extends NCApplication { + @Override + public void stop() throws Exception { + // ungraceful shutdown + webManager.stop(); + } + } + +}
