Introduced distributed configuration management to workflow and resource managers + tests
Project: http://git-wip-us.apache.org/repos/asf/oodt/repo Commit: http://git-wip-us.apache.org/repos/asf/oodt/commit/fc6311db Tree: http://git-wip-us.apache.org/repos/asf/oodt/tree/fc6311db Diff: http://git-wip-us.apache.org/repos/asf/oodt/diff/fc6311db Branch: refs/heads/development Commit: fc6311db2ed4e5ccc6129cb88c71bb5428037b98 Parents: 3df0e43 Author: Imesha Sudasingha <imesha.sudasin...@gmail.com> Authored: Wed Aug 16 01:17:42 2017 +0530 Committer: Imesha Sudasingha <imesha.sudasin...@gmail.com> Committed: Wed Aug 16 01:17:42 2017 +0530 ---------------------------------------------------------------------- config/pom.xml | 2 +- .../java/org/apache/oodt/config/Component.java | 3 +- .../config/ConfigurationManagerFactory.java | 11 +- .../TestDistributedXmlRpcFileManager.java | 230 ------------------- .../TestDistributedXmlRpcFileManager.java | 230 +++++++++++++++++++ .../distributed/config/cmd-line-actions.xml | 38 --- .../distributed/config/cmd-line-options.xml | 85 ------- resource/pom.xml | 8 + .../resource/system/XmlRpcResourceManager.java | 48 ++-- .../system/TestXmlRpcResourceManager.java | 19 +- .../TestDistributedXmlRpcResourceManager.java | 83 +++++++ .../config/distributed/config-publisher.xml | 40 ++++ .../config/distributed/resource.properties | 75 ++++++ workflow/pom.xml | 8 + .../workflow/system/XmlRpcWorkflowManager.java | 34 ++- .../TestDistributedXmlRpcWorkflowManager.java | 136 +++++++++++ .../config/distributed/config-publisher.xml | 56 +++++ .../config/distributed/workflow.properties | 87 +++++++ 18 files changed, 792 insertions(+), 401 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/config/pom.xml ---------------------------------------------------------------------- diff --git a/config/pom.xml b/config/pom.xml index 3380848..3f26499 100644 --- a/config/pom.xml +++ b/config/pom.xml @@ -5,7 +5,7 @@ <parent> <artifactId>oodt-core</artifactId> <groupId>org.apache.oodt</groupId> - <version>1.2-SNAPSHOT</version> + <version>1.3-SNAPSHOT</version> <relativePath>../core/pom.xml</relativePath> </parent> <modelVersion>4.0.0</modelVersion> http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/config/src/main/java/org/apache/oodt/config/Component.java ---------------------------------------------------------------------- diff --git a/config/src/main/java/org/apache/oodt/config/Component.java b/config/src/main/java/org/apache/oodt/config/Component.java index 2c7123e..2c755c1 100644 --- a/config/src/main/java/org/apache/oodt/config/Component.java +++ b/config/src/main/java/org/apache/oodt/config/Component.java @@ -25,7 +25,8 @@ package org.apache.oodt.config; */ public enum Component { FILE_MANAGER("filemgr", "FILEMGR_HOME"), - RESOURCE_MANAGER("resmgr", "RESMGR_HOME"); + RESOURCE_MANAGER("resmgr", "RESMGR_HOME"), + WORKFLOW_MANAGER("wmgr", "WORKFLOW_HOME"); /** Shorthand name of the component. Will be used when creating ZNodes in zookeeper */ String name; http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/config/src/main/java/org/apache/oodt/config/ConfigurationManagerFactory.java ---------------------------------------------------------------------- diff --git a/config/src/main/java/org/apache/oodt/config/ConfigurationManagerFactory.java b/config/src/main/java/org/apache/oodt/config/ConfigurationManagerFactory.java index f3a5dc5..67591a0 100644 --- a/config/src/main/java/org/apache/oodt/config/ConfigurationManagerFactory.java +++ b/config/src/main/java/org/apache/oodt/config/ConfigurationManagerFactory.java @@ -54,12 +54,13 @@ public class ConfigurationManagerFactory { * @return ConfigurationManager instance to used by the corresponding component. */ public static ConfigurationManager getConfigurationManager(Component component, List<String> propertiesFiles) { - boolean isDistributed = Boolean.getBoolean(ENABLE_DISTRIBUTED_CONFIGURATION); - if (!isDistributed) { + String enableDistributed = System.getProperty(ENABLE_DISTRIBUTED_CONFIGURATION); + boolean isDistributed; + if (enableDistributed == null) { String env = System.getenv(Env.ENABLE_DISTRIBUTED_CONFIGURATION); - if (env != null) { - isDistributed = Boolean.parseBoolean(env); - } + isDistributed = Boolean.parseBoolean(env); + } else { + isDistributed = Boolean.parseBoolean(enableDistributed); } if (isDistributed) { http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/filemgr/src/test/java/org/apache/oodt/cas/filemgr/config/distributed/TestDistributedXmlRpcFileManager.java ---------------------------------------------------------------------- diff --git a/filemgr/src/test/java/org/apache/oodt/cas/filemgr/config/distributed/TestDistributedXmlRpcFileManager.java b/filemgr/src/test/java/org/apache/oodt/cas/filemgr/config/distributed/TestDistributedXmlRpcFileManager.java deleted file mode 100644 index 19a8e56..0000000 --- a/filemgr/src/test/java/org/apache/oodt/cas/filemgr/config/distributed/TestDistributedXmlRpcFileManager.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.oodt.cas.filemgr.config.distributed; - -import org.apache.oodt.cas.filemgr.ingest.StdIngester; -import org.apache.oodt.cas.filemgr.metadata.CoreMetKeys; -import org.apache.oodt.cas.filemgr.metadata.ProductMetKeys; -import org.apache.oodt.cas.filemgr.structs.Product; -import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException; -import org.apache.oodt.cas.filemgr.system.XmlRpcFileManager; -import org.apache.oodt.cas.filemgr.system.XmlRpcFileManagerClient; -import org.apache.oodt.cas.metadata.Metadata; -import org.apache.oodt.cas.metadata.SerializableMetadata; -import org.apache.oodt.cas.metadata.util.PathUtils; -import org.apache.oodt.config.distributed.cli.ConfigPublisher; -import org.apache.oodt.config.test.AbstractDistributedConfigurationTest; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; - -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.net.URL; -import java.util.Collections; - -import static org.apache.oodt.config.Constants.Properties.ENABLE_DISTRIBUTED_CONFIGURATION; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; - -/** - * Tests the {@link XmlRpcFileManager} with distributed configuration management enabled. This test will first publish - * the required configuration to zookeeper through {@link ConfigPublisher} and will start the {@link XmlRpcFileManager} - * which will first download and store published files locally. Then the correct functionality of {@link - * XmlRpcFileManager} is tested by using those downloaded configuration files for configuration. - * <p> - * This class is adapted from {@link org.apache.oodt.cas.filemgr.system.TestXmlRpcFileManager} class - * - * @author Imesha Sudasingha - */ -public class TestDistributedXmlRpcFileManager extends AbstractDistributedConfigurationTest { - - private static final int FM_PORT = 9001; - private static final String CONF_PUBLISHER_XML = "distributed/config/config-publisher.xml"; - private static final String TRANSFER_SERVICE_FACTORY_CLASS = "org.apache.oodt.cas.filemgr.datatransfer.LocalDataTransferFactory"; - - private XmlRpcFileManager fileManager; - - @Before - public void setUpTest() throws Exception { - System.setProperty("org.apache.oodt.cas.cli.action.spring.config", "src/test/resources/distributed/config/cmd-line-actions.xml"); - System.setProperty("org.apache.oodt.cas.cli.option.spring.config", "src/test/resources/distributed/config/cmd-line-options.xml"); - System.setProperty(ENABLE_DISTRIBUTED_CONFIGURATION, "true"); - - ConfigPublisher.main(new String[]{ - "-connectString", zookeeper.getConnectString(), - "-config", CONF_PUBLISHER_XML, - "-a", "publish" - }); - - try { - fileManager = new XmlRpcFileManager(FM_PORT); - } catch (Exception e) { - fail(e.getMessage()); - } - - ingestFile(); - } - - @Test - public void testDistributedConfigurationWithFileManager() { - XmlRpcFileManagerClient fmc = null; - try { - fmc = new XmlRpcFileManagerClient(new URL("http://localhost:" + FM_PORT)); - } catch (Exception e) { - fail(e.getMessage()); - } - - Metadata met = null; - try { - met = fmc.getMetadata(fmc.getProductByName("test.txt")); - } catch (CatalogException e) { - fail(e.getMessage()); - } - - assertNotNull(met); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_ID)); - assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_ID)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_NAME)); - assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_NAME)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_STRUCTURE)); - assertEquals("Flat", met.getMetadata(ProductMetKeys.PRODUCT_STRUCTURE)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_TRANSFER_STATUS)); - assertEquals(Product.STATUS_RECEIVED, met.getMetadata(ProductMetKeys.PRODUCT_TRANSFER_STATUS)); - - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_ORIG_REFS)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_DATASTORE_REFS)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_FILE_SIZES)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_MIME_TYPES)); - - assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_ORIG_REFS).size()); - assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_DATASTORE_REFS).size()); - assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_FILE_SIZES).size()); - assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_MIME_TYPES).size()); - - URL refUrl = this.getClass().getResource("/ingest/test.txt"); - - String origPath = null; - try { - origPath = new File(refUrl.getFile()).getCanonicalPath(); - } catch (IOException e) { - fail(e.getMessage()); - } - assertEquals(origPath, met.getMetadata(ProductMetKeys.PRODUCT_ORIG_REFS)); - assertEquals("/tmp/test.txt/test.txt", met.getMetadata(ProductMetKeys.PRODUCT_DATASTORE_REFS)); - - assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_FILE_SIZES)); - assertEquals("text/plain", met.getMetadata(ProductMetKeys.PRODUCT_MIME_TYPES)); - - try { - met = fmc.getReducedMetadata(fmc.getProductByName("test.txt"), Collections.EMPTY_LIST); - } catch (CatalogException e) { - fail(e.getMessage()); - } - - assertNotNull(met); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_ID)); - assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_ID)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_NAME)); - assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_NAME)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_STRUCTURE)); - assertEquals("Flat", met.getMetadata(ProductMetKeys.PRODUCT_STRUCTURE)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_TRANSFER_STATUS)); - assertEquals(Product.STATUS_RECEIVED, met.getMetadata(ProductMetKeys.PRODUCT_TRANSFER_STATUS)); - - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_ORIG_REFS)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_DATASTORE_REFS)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_FILE_SIZES)); - assertTrue(met.containsKey(ProductMetKeys.PRODUCT_MIME_TYPES)); - - assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_ORIG_REFS).size()); - assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_DATASTORE_REFS).size()); - assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_FILE_SIZES).size()); - assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_MIME_TYPES).size()); - - origPath = null; - try { - origPath = new File(refUrl.getFile()).getCanonicalPath(); - } catch (IOException e) { - fail(e.getMessage()); - } - assertEquals(origPath, met.getMetadata(ProductMetKeys.PRODUCT_ORIG_REFS)); - assertEquals("/tmp/test.txt/test.txt", met.getMetadata(ProductMetKeys.PRODUCT_DATASTORE_REFS)); - - assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_FILE_SIZES)); - assertEquals("text/plain", met.getMetadata(ProductMetKeys.PRODUCT_MIME_TYPES)); - } - - private void ingestFile() { - StdIngester ingester = new StdIngester(TRANSFER_SERVICE_FACTORY_CLASS); - - try { - URL ingestUrl = this.getClass().getResource("/ingest"); - URL refUrl = this.getClass().getResource("/ingest/test.txt"); - URL metUrl = this.getClass().getResource("/ingest/test.txt.met"); - Metadata prodMet = new SerializableMetadata(new FileInputStream(new File(metUrl.getFile()))); - - // now add the right file location - prodMet.addMetadata(CoreMetKeys.FILE_LOCATION, new File(ingestUrl.getFile()).getCanonicalPath()); - prodMet.addMetadata(CoreMetKeys.FILENAME, "test.txt"); - prodMet.addMetadata(CoreMetKeys.PRODUCT_TYPE, "GenericFile"); - ingester.ingest(new URL("http://localhost:" + FM_PORT), new File(refUrl.getFile()), prodMet); - } catch (Exception e) { - fail(e.getMessage()); - } - } - - @After - public void tearDownTest() throws Exception { - if (fileManager != null) { - fileManager.shutdown(); - } - - ConfigPublisher.main(new String[]{ - "-connectString", zookeeper.getConnectString(), - "-config", CONF_PUBLISHER_XML, - "-a", "clear" - }); - - String luceneIdx = System.getProperty("org.apache.oodt.cas.filemgr.catalog.lucene.idxPath"); - if (luceneIdx != null) { - luceneIdx = PathUtils.replaceEnvVariables(luceneIdx); - deleteAllFiles(luceneIdx); - } - - System.clearProperty("org.apache.oodt.cas.cli.action.spring.config"); - System.clearProperty("org.apache.oodt.cas.cli.option.spring.config"); - System.clearProperty(ENABLE_DISTRIBUTED_CONFIGURATION); - } - - private void deleteAllFiles(String startDir) { - File startDirFile = new File(startDir); - File[] delFiles = startDirFile.listFiles(); - - if (delFiles != null && delFiles.length > 0) { - for (File delFile : delFiles) { - delFile.delete(); - } - } - - startDirFile.delete(); - } -} http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/filemgr/src/test/java/org/apache/oodt/cas/filemgr/system/distributed/TestDistributedXmlRpcFileManager.java ---------------------------------------------------------------------- diff --git a/filemgr/src/test/java/org/apache/oodt/cas/filemgr/system/distributed/TestDistributedXmlRpcFileManager.java b/filemgr/src/test/java/org/apache/oodt/cas/filemgr/system/distributed/TestDistributedXmlRpcFileManager.java new file mode 100644 index 0000000..938a89b --- /dev/null +++ b/filemgr/src/test/java/org/apache/oodt/cas/filemgr/system/distributed/TestDistributedXmlRpcFileManager.java @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.filemgr.system.distributed; + +import org.apache.oodt.cas.filemgr.ingest.StdIngester; +import org.apache.oodt.cas.filemgr.metadata.CoreMetKeys; +import org.apache.oodt.cas.filemgr.metadata.ProductMetKeys; +import org.apache.oodt.cas.filemgr.structs.Product; +import org.apache.oodt.cas.filemgr.structs.exceptions.CatalogException; +import org.apache.oodt.cas.filemgr.system.XmlRpcFileManager; +import org.apache.oodt.cas.filemgr.system.XmlRpcFileManagerClient; +import org.apache.oodt.cas.metadata.Metadata; +import org.apache.oodt.cas.metadata.SerializableMetadata; +import org.apache.oodt.cas.metadata.util.PathUtils; +import org.apache.oodt.config.distributed.cli.ConfigPublisher; +import org.apache.oodt.config.test.AbstractDistributedConfigurationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.net.URL; +import java.util.Collections; + +import static org.apache.oodt.config.Constants.Properties.ENABLE_DISTRIBUTED_CONFIGURATION; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests the {@link XmlRpcFileManager} with distributed configuration management enabled. This test will first publish + * the required configuration to zookeeper through {@link ConfigPublisher} and will start the {@link XmlRpcFileManager} + * which will first download and store published files locally. Then the correct functionality of {@link + * XmlRpcFileManager} is tested by using those downloaded configuration files for configuration. + * <p> + * This class is adapted from {@link org.apache.oodt.cas.filemgr.system.TestXmlRpcFileManager} class + * + * @author Imesha Sudasingha + */ +public class TestDistributedXmlRpcFileManager extends AbstractDistributedConfigurationTest { + + private static final int FM_PORT = 9001; + private static final String CONF_PUBLISHER_XML = "distributed/config/config-publisher.xml"; + private static final String TRANSFER_SERVICE_FACTORY_CLASS = "org.apache.oodt.cas.filemgr.datatransfer.LocalDataTransferFactory"; + + private XmlRpcFileManager fileManager; + + @Before + public void setUpTest() throws Exception { + System.setProperty("org.apache.oodt.cas.cli.action.spring.config", "../config/src/main/resources/cmd-line-actions.xml"); + System.setProperty("org.apache.oodt.cas.cli.option.spring.config", "../config/src/main/resources/cmd-line-options.xml"); + System.setProperty(ENABLE_DISTRIBUTED_CONFIGURATION, "true"); + + ConfigPublisher.main(new String[]{ + "-connectString", zookeeper.getConnectString(), + "-config", CONF_PUBLISHER_XML, + "-a", "publish" + }); + + try { + fileManager = new XmlRpcFileManager(FM_PORT); + } catch (Exception e) { + fail(e.getMessage()); + } + + ingestFile(); + } + + @Test + public void testDistributedConfigurationWithFileManager() { + XmlRpcFileManagerClient fmc = null; + try { + fmc = new XmlRpcFileManagerClient(new URL("http://localhost:" + FM_PORT)); + } catch (Exception e) { + fail(e.getMessage()); + } + + Metadata met = null; + try { + met = fmc.getMetadata(fmc.getProductByName("test.txt")); + } catch (CatalogException e) { + fail(e.getMessage()); + } + + assertNotNull(met); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_ID)); + assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_ID)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_NAME)); + assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_NAME)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_STRUCTURE)); + assertEquals("Flat", met.getMetadata(ProductMetKeys.PRODUCT_STRUCTURE)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_TRANSFER_STATUS)); + assertEquals(Product.STATUS_RECEIVED, met.getMetadata(ProductMetKeys.PRODUCT_TRANSFER_STATUS)); + + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_ORIG_REFS)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_DATASTORE_REFS)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_FILE_SIZES)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_MIME_TYPES)); + + assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_ORIG_REFS).size()); + assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_DATASTORE_REFS).size()); + assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_FILE_SIZES).size()); + assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_MIME_TYPES).size()); + + URL refUrl = this.getClass().getResource("/ingest/test.txt"); + + String origPath = null; + try { + origPath = new File(refUrl.getFile()).getCanonicalPath(); + } catch (IOException e) { + fail(e.getMessage()); + } + assertEquals(origPath, met.getMetadata(ProductMetKeys.PRODUCT_ORIG_REFS)); + assertEquals("/tmp/test.txt/test.txt", met.getMetadata(ProductMetKeys.PRODUCT_DATASTORE_REFS)); + + assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_FILE_SIZES)); + assertEquals("text/plain", met.getMetadata(ProductMetKeys.PRODUCT_MIME_TYPES)); + + try { + met = fmc.getReducedMetadata(fmc.getProductByName("test.txt"), Collections.EMPTY_LIST); + } catch (CatalogException e) { + fail(e.getMessage()); + } + + assertNotNull(met); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_ID)); + assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_ID)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_NAME)); + assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_NAME)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_STRUCTURE)); + assertEquals("Flat", met.getMetadata(ProductMetKeys.PRODUCT_STRUCTURE)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_TRANSFER_STATUS)); + assertEquals(Product.STATUS_RECEIVED, met.getMetadata(ProductMetKeys.PRODUCT_TRANSFER_STATUS)); + + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_ORIG_REFS)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_DATASTORE_REFS)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_FILE_SIZES)); + assertTrue(met.containsKey(ProductMetKeys.PRODUCT_MIME_TYPES)); + + assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_ORIG_REFS).size()); + assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_DATASTORE_REFS).size()); + assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_FILE_SIZES).size()); + assertEquals(1, met.getAllMetadata(ProductMetKeys.PRODUCT_MIME_TYPES).size()); + + origPath = null; + try { + origPath = new File(refUrl.getFile()).getCanonicalPath(); + } catch (IOException e) { + fail(e.getMessage()); + } + assertEquals(origPath, met.getMetadata(ProductMetKeys.PRODUCT_ORIG_REFS)); + assertEquals("/tmp/test.txt/test.txt", met.getMetadata(ProductMetKeys.PRODUCT_DATASTORE_REFS)); + + assertNotNull(met.getMetadata(ProductMetKeys.PRODUCT_FILE_SIZES)); + assertEquals("text/plain", met.getMetadata(ProductMetKeys.PRODUCT_MIME_TYPES)); + } + + private void ingestFile() { + StdIngester ingester = new StdIngester(TRANSFER_SERVICE_FACTORY_CLASS); + + try { + URL ingestUrl = this.getClass().getResource("/ingest"); + URL refUrl = this.getClass().getResource("/ingest/test.txt"); + URL metUrl = this.getClass().getResource("/ingest/test.txt.met"); + Metadata prodMet = new SerializableMetadata(new FileInputStream(new File(metUrl.getFile()))); + + // now add the right file location + prodMet.addMetadata(CoreMetKeys.FILE_LOCATION, new File(ingestUrl.getFile()).getCanonicalPath()); + prodMet.addMetadata(CoreMetKeys.FILENAME, "test.txt"); + prodMet.addMetadata(CoreMetKeys.PRODUCT_TYPE, "GenericFile"); + ingester.ingest(new URL("http://localhost:" + FM_PORT), new File(refUrl.getFile()), prodMet); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @After + public void tearDownTest() throws Exception { + if (fileManager != null) { + fileManager.shutdown(); + } + + ConfigPublisher.main(new String[]{ + "-connectString", zookeeper.getConnectString(), + "-config", CONF_PUBLISHER_XML, + "-a", "clear" + }); + + String luceneIdx = System.getProperty("org.apache.oodt.cas.filemgr.catalog.lucene.idxPath"); + if (luceneIdx != null) { + luceneIdx = PathUtils.replaceEnvVariables(luceneIdx); + deleteAllFiles(luceneIdx); + } + + System.clearProperty("org.apache.oodt.cas.cli.action.spring.config"); + System.clearProperty("org.apache.oodt.cas.cli.option.spring.config"); + System.clearProperty(ENABLE_DISTRIBUTED_CONFIGURATION); + } + + private void deleteAllFiles(String startDir) { + File startDirFile = new File(startDir); + File[] delFiles = startDirFile.listFiles(); + + if (delFiles != null && delFiles.length > 0) { + for (File delFile : delFiles) { + delFile.delete(); + } + } + + startDirFile.delete(); + } +} http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/filemgr/src/test/resources/distributed/config/cmd-line-actions.xml ---------------------------------------------------------------------- diff --git a/filemgr/src/test/resources/distributed/config/cmd-line-actions.xml b/filemgr/src/test/resources/distributed/config/cmd-line-actions.xml deleted file mode 100644 index 08f5415..0000000 --- a/filemgr/src/test/resources/distributed/config/cmd-line-actions.xml +++ /dev/null @@ -1,38 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<beans xmlns="http://www.springframework.org/schema/beans" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" - xsi:schemaLocation="http://www.springframework.org/schema/beans - http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"> - - <bean id="publish" class="org.apache.oodt.config.distributed.cli.CLIAction"> - <constructor-arg value="PUBLISH"/> - <property name="description" value="Publishes configuration to zookeeper"/> - </bean> - - <bean id="verify" class="org.apache.oodt.config.distributed.cli.CLIAction"> - <constructor-arg value="VERIFY"/> - <property name="description" value="Verifies configuration published to zookeeper"/> - </bean> - - <bean id="clear" class="org.apache.oodt.config.distributed.cli.CLIAction"> - <constructor-arg value="CLEAR"/> - <property name="description" value="Clears all published configuration from zookeeper"/> - </bean> -</beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/filemgr/src/test/resources/distributed/config/cmd-line-options.xml ---------------------------------------------------------------------- diff --git a/filemgr/src/test/resources/distributed/config/cmd-line-options.xml b/filemgr/src/test/resources/distributed/config/cmd-line-options.xml deleted file mode 100644 index 698ad32..0000000 --- a/filemgr/src/test/resources/distributed/config/cmd-line-options.xml +++ /dev/null @@ -1,85 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> - -<beans xmlns="http://www.springframework.org/schema/beans" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p" - xsi:schemaLocation="http://www.springframework.org/schema/beans - http://www.springframework.org/schema/beans/spring-beans-2.0.xsd"> - - <bean id="connectStringOption" class="org.apache.oodt.cas.cli.option.AdvancedCmdLineOption"> - <property name="shortOption" value="cs"/> - <property name="longOption" value="connectString"/> - <property name="description" value="Connect String to Zookeeper (ip1:port1,ip2:port2,..)"/> - <property name="hasArgs" value="true"/> - <property name="requirementRules"> - <list> - <bean class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule" - p:actionName="publish" p:relation="REQUIRED"/> - <bean class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule" - p:actionName="clear" p:relation="REQUIRED"/> - <bean class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule" - p:actionName="verify" p:relation="REQUIRED"/> - </list> - </property> - <property name="handler"> - <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToActionHandler"> - <property name="applyToActions"> - <list> - <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToAction" - p:actionName="publish" p:methodName="setConnectString"/> - <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToAction" - p:actionName="verify" p:methodName="setConnectString"/> - <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToAction" - p:actionName="clear" p:methodName="setConnectString"/> - </list> - </property> - </bean> - </property> - </bean> - - <bean id="publisherXML" class="org.apache.oodt.cas.cli.option.AdvancedCmdLineOption"> - <property name="shortOption" value="c"/> - <property name="longOption" value="config"/> - <property name="description" value="Configuration publisher spring configuration XML"/> - <property name="hasArgs" value="true"/> - <property name="requirementRules"> - <list> - <bean class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule" - p:actionName="publish" p:relation="OPTIONAL"/> - <bean class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule" - p:actionName="clear" p:relation="OPTIONAL"/> - <bean class="org.apache.oodt.cas.cli.option.require.ActionDependencyRule" - p:actionName="verify" p:relation="OPTIONAL"/> - </list> - </property> - <property name="handler"> - <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToActionHandler"> - <property name="applyToActions"> - <list> - <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToAction" - p:actionName="publish" p:methodName="setConfigFile"/> - <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToAction" - p:actionName="verify" p:methodName="setConfigFile"/> - <bean class="org.apache.oodt.cas.cli.option.handler.ApplyToAction" - p:actionName="clear" p:methodName="setConfigFile"/> - </list> - </property> - </bean> - </property> - </bean> -</beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/resource/pom.xml ---------------------------------------------------------------------- diff --git a/resource/pom.xml b/resource/pom.xml index d9d23df..a1197b7 100644 --- a/resource/pom.xml +++ b/resource/pom.xml @@ -110,6 +110,10 @@ the License. <artifactId>cas-metadata</artifactId> </dependency> <dependency> + <groupId>org.apache.oodt</groupId> + <artifactId>oodt-conf</artifactId> + </dependency> + <dependency> <groupId>org.safehaus.jug</groupId> <artifactId>jug</artifactId> <classifier>asl</classifier> @@ -138,6 +142,10 @@ the License. <value>${basedir}/src/test/resources/test.logging.properties</value> </property> </systemProperties> + <environmentVariables> + <RESMGR_HOME>${project.basedir}</RESMGR_HOME> + <OODT_PROJECT>primary</OODT_PROJECT> + </environmentVariables> <forkedProcessTimeoutInSeconds>0</forkedProcessTimeoutInSeconds> <redirectTestOutputToFile>true</redirectTestOutputToFile> <includes> http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java index 87f6bea..3536607 100644 --- a/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java +++ b/resource/src/main/java/org/apache/oodt/cas/resource/system/XmlRpcResourceManager.java @@ -32,13 +32,15 @@ import org.apache.oodt.cas.resource.structs.exceptions.SchedulerException; import org.apache.oodt.cas.resource.util.GenericResourceManagerObjectFactory; import org.apache.oodt.cas.resource.util.ResourceNodeComparator; import org.apache.oodt.cas.resource.util.XmlRpcStructFactory; +import org.apache.oodt.config.Component; +import org.apache.oodt.config.ConfigurationManager; +import org.apache.oodt.config.ConfigurationManagerFactory; import org.apache.xmlrpc.WebServer; -import java.io.File; -import java.io.FileInputStream; import java.io.IOException; import java.net.MalformedURLException; import java.net.URL; +import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.Hashtable; @@ -69,16 +71,22 @@ public class XmlRpcResourceManager { /* our scheduler */ private Scheduler scheduler = null; + /** Configuration Manager instance of this instance */ + private ConfigurationManager configurationManager; + public XmlRpcResourceManager(int port) throws IOException { - // load properties from workflow manager properties file, if specified + List<String> propertiesFiles = new ArrayList<>(); + // set up the configuration, if there is any if (System.getProperty("org.apache.oodt.cas.resource.properties") != null) { - String configFile = System - .getProperty("org.apache.oodt.cas.resource.properties"); - LOG.log(Level.INFO, - "Loading Resource Manager Configuration Properties from: [" - + configFile + "]"); - System.getProperties().load( - new FileInputStream(new File(configFile))); + propertiesFiles.add(System.getProperty("org.apache.oodt.cas.resource.properties")); + } + + configurationManager = ConfigurationManagerFactory.getConfigurationManager(Component.RESOURCE_MANAGER, propertiesFiles); + try { + configurationManager.loadConfiguration(); + } catch (Exception e) { + LOG.log(Level.SEVERE, "Unable to load configuration", e); + throw new IOException("Unable to load configuration", e); } String schedulerClassStr = System.getProperty( @@ -323,15 +331,17 @@ public class XmlRpcResourceManager { public List<String> getQueuesWithNode(String nodeId) { return new Vector<String>(this.scheduler.getQueueManager().getQueues(nodeId)); } - - public boolean shutdown(){ - if (this.webServer != null) { - this.webServer.shutdown(); - this.webServer = null; - return true; - } else { - return false; - } + + public boolean shutdown() { + configurationManager.clearConfiguration(); + + if (this.webServer != null) { + this.webServer.shutdown(); + this.webServer = null; + return true; + } else { + return false; + } } public String getNodeLoad(String nodeId) throws MonitorException{ http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java index 342194e..f0358b6 100644 --- a/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java +++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/TestXmlRpcResourceManager.java @@ -77,7 +77,6 @@ public class TestXmlRpcResourceManager extends TestCase { fail(e.getMessage()); } assertEquals(8, setCapacity); - } /* @@ -116,7 +115,7 @@ public class TestXmlRpcResourceManager extends TestCase { } - private void generateTestConfiguration() throws IOException { + private void generateTestConfiguration() throws IOException { Properties config = new Properties(); String propertiesFile = "." + File.separator + "src" + File.separator + @@ -131,13 +130,13 @@ public class TestXmlRpcResourceManager extends TestCase { fail(e.getMessage()); } for (File policyFile : new File("./src/test/resources/policy") - .listFiles(new FileFilter() { + .listFiles(new FileFilter() { - @Override - public boolean accept(File pathname) { - return pathname.isFile() && pathname.getName().endsWith(".xml"); - } - })) { + @Override + public boolean accept(File pathname) { + return pathname.isFile() && pathname.getName().endsWith(".xml"); + } + })) { try { FileUtils.copyFileToDirectory(policyFile, tmpPolicyDir); } catch (Exception e) { @@ -146,9 +145,9 @@ public class TestXmlRpcResourceManager extends TestCase { } config.setProperty("org.apache.oodt.cas.resource.nodes.dirs", tmpPolicyDir - .toURI().toString()); + .toURI().toString()); config.setProperty("org.apache.oodt.cas.resource.nodetoqueues.dirs", - tmpPolicyDir.toURI().toString()); + tmpPolicyDir.toURI().toString()); System.getProperties().putAll(config); this.tmpPolicyDir = tmpPolicyDir; http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java ---------------------------------------------------------------------- diff --git a/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java new file mode 100644 index 0000000..0649b44 --- /dev/null +++ b/resource/src/test/java/org/apache/oodt/cas/resource/system/distributed/TestDistributedXmlRpcResourceManager.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.resource.system.distributed; + +import org.apache.oodt.cas.resource.system.TestXmlRpcResourceManager; +import org.apache.oodt.cas.resource.system.XmlRpcResourceManager; +import org.apache.oodt.config.distributed.cli.ConfigPublisher; +import org.apache.oodt.config.test.AbstractDistributedConfigurationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import static org.apache.oodt.config.Constants.Properties.ENABLE_DISTRIBUTED_CONFIGURATION; +import static org.junit.Assert.fail; + +/** + * Test the operation of Resource Manager under distributed configuration management enabled + * + * @author Imesha Sudasingha + */ +public class TestDistributedXmlRpcResourceManager extends AbstractDistributedConfigurationTest { + + private static final int RM_PORT = 50001; + private static final String CONF_PUBLISHER_XML = "config/distributed/config-publisher.xml"; + + private XmlRpcResourceManager resourceManager; + + @Before + public void setUpTest() throws Exception { + System.setProperty("org.apache.oodt.cas.cli.action.spring.config", "../config/src/main/resources/cmd-line-actions.xml"); + System.setProperty("org.apache.oodt.cas.cli.option.spring.config", "../config/src/main/resources/cmd-line-options.xml"); + System.setProperty(ENABLE_DISTRIBUTED_CONFIGURATION, "true"); + + ConfigPublisher.main(new String[]{ + "-connectString", zookeeper.getConnectString(), + "-config", CONF_PUBLISHER_XML, + "-a", "publish" + }); + + try { + resourceManager = new XmlRpcResourceManager(RM_PORT); + } catch (Exception e) { + fail(e.getMessage()); + } + } + + @Test + public void testDynSetNodeCapacity() { + new TestXmlRpcResourceManager().testDynSetNodeCapacity(); + } + + @After + public void tearDownTest() throws Exception { + if (resourceManager != null) { + resourceManager.shutdown(); + } + + ConfigPublisher.main(new String[]{ + "-connectString", zookeeper.getConnectString(), + "-config", CONF_PUBLISHER_XML, + "-a", "clear" + }); + + System.clearProperty("org.apache.oodt.cas.cli.action.spring.config"); + System.clearProperty("org.apache.oodt.cas.cli.option.spring.config"); + System.clearProperty(ENABLE_DISTRIBUTED_CONFIGURATION); + } +} http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/resource/src/test/resources/config/distributed/config-publisher.xml ---------------------------------------------------------------------- diff --git a/resource/src/test/resources/config/distributed/config-publisher.xml b/resource/src/test/resources/config/distributed/config-publisher.xml new file mode 100644 index 0000000..ab7a0ce --- /dev/null +++ b/resource/src/test/resources/config/distributed/config-publisher.xml @@ -0,0 +1,40 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> + + <bean id="resmgr-config-publisher" class="org.apache.oodt.config.distributed.DistributedConfigurationPublisher"> + <constructor-arg value="RESOURCE_MANAGER"/> + + <constructor-arg value="primary"/> + + <property name="propertiesFiles"> + <map key-type="java.lang.String" value-type="java.lang.String"> + <entry key="src/test/resources/config/distributed/resource.properties" value="target/resmgr/etc/resource.properties"/> + <entry key="src/test/resources/test.logging.properties" value="target/resmgr/etc/logging.properties"/> + </map> + </property> + <property name="configFiles"> + <map key-type="java.lang.String" value-type="java.lang.String"> + <entry key="src/test/resources/policy/nodes.xml" value="target/resmgr/policy/nodes.xml"/> + <entry key="src/test/resources/policy/node-to-queue-mapping.xml" value="target/resmgr/policy/node-to-queue-mapping.xml"/> + </map> + </property> + </bean> +</beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/resource/src/test/resources/config/distributed/resource.properties ---------------------------------------------------------------------- diff --git a/resource/src/test/resources/config/distributed/resource.properties b/resource/src/test/resources/config/distributed/resource.properties new file mode 100644 index 0000000..e51fc73 --- /dev/null +++ b/resource/src/test/resources/config/distributed/resource.properties @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# resource batchmgr factory +resource.batchmgr.factory = org.apache.oodt.cas.resource.batchmgr.XmlRpcBatchMgrFactory + +# resource monitor factory +resource.monitor.factory = org.apache.oodt.cas.resource.monitor.AssignmentMonitorFactory + +# resource scheduler factory +resource.scheduler.factory = org.apache.oodt.cas.resource.scheduler.LRUSchedulerFactory + +# resource jobqueue factory +resource.jobqueue.factory = org.apache.oodt.cas.resource.jobqueue.JobStackJobQueueFactory + +# resource job repository factory +resource.jobrepo.factory = org.apache.oodt.cas.resource.jobrepo.MemoryJobRepositoryFactory + +# node repository factory +org.apache.oodt.cas.resource.nodes.repo.factory = org.apache.oodt.cas.resource.noderepo.XmlNodeRepositoryFactory + +# queue repository factory +org.apache.oodt.cas.resource.queues.repo.factory = org.apache.oodt.cas.resource.queuerepo.XmlQueueRepositoryFactory + +# resource nodes monitor factory +org.apache.oodt.cas.resource.monitor.factory = org.apache.oodt.cas.resource.monitor.ganglia.GangliaResourceMonitorFactory + +# ganglia resource monitor's load calculator factory +org.apache.oodt.cas.resource.monitor.loadcalc.factory = org.apache.oodt.cas.resource.monitor.ganglia.loadcalc.WeightedAverageLoadCalcFactory + +# JobStack JobQueue config properties +org.apache.oodt.cas.resource.jobqueue.jobstack.maxstacksize=1000 + +# XML LRUScheduler config properties +org.apache.oodt.cas.resource.scheduler.wait.seconds=20 + +# XML-RPC configuration props +org.apache.oodt.cas.resource.system.xmlrpc.requestTimeout.minutes=20 +org.apache.oodt.cas.resource.system.xmlrpc.connectionTimeout.minutes=60 + +# XStream JobRepo configuration props +org.apache.oodt.cas.resource.jobrepo.xstream.working.dir=[RESMGR_HOME]/job-repo +org.apache.oodt.cas.resource.jobrepo.xstream.max.history=4000 + +# XML Node Repository config properties +org.apache.oodt.cas.resource.nodes.dirs=file://[RESMGR_HOME]/target/resmgr/policy + +# XML Queue Repository config properties +org.apache.oodt.cas.resource.nodetoqueues.dirs=file://[RESMGR_HOME]/target/resmgr/policy + +# Load calculation weights +org.apache.oodt.cas.resource.monitor.loadcalc.weight.loadone=1 +org.apache.oodt.cas.resource.monitor.loadcalc.weight.loadfive=5 +org.apache.oodt.cas.resource.monitor.loadcalc.weight.loadfifteen=5 +org.apache.oodt.cas.resource.monitor.loadcalc.weight.memfree=2 +org.apache.oodt.cas.resource.monitor.loadcalc.weight.swapfree=1 + +#ganglia meta daemon (gmetad) host details +org.apache.oodt.cas.resource.monitor.ganglia.gemtad.host.address=localhost +org.apache.oodt.cas.resource.monitor.ganglia.gemtad.host.port=8659 + http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/workflow/pom.xml ---------------------------------------------------------------------- diff --git a/workflow/pom.xml b/workflow/pom.xml index 50e6743..a6e0c44 100644 --- a/workflow/pom.xml +++ b/workflow/pom.xml @@ -138,6 +138,10 @@ the License. <artifactId>oodt-commons</artifactId> </dependency> <dependency> + <groupId>org.apache.oodt</groupId> + <artifactId>oodt-conf</artifactId> + </dependency> + <dependency> <groupId>org.safehaus.jug</groupId> <artifactId>jug</artifactId> <classifier>asl</classifier> @@ -161,6 +165,10 @@ the License. <value>${basedir}/src/test/resources/test.logging.properties</value> </property> </systemProperties> + <environmentVariables> + <WORKFLOW_HOME>${project.basedir}</WORKFLOW_HOME> + <OODT_PROJECT>primary</OODT_PROJECT> + </environmentVariables> <forkedProcessTimeoutInSeconds>0</forkedProcessTimeoutInSeconds> <redirectTestOutputToFile>true</redirectTestOutputToFile> <includes> http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManager.java ---------------------------------------------------------------------- diff --git a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManager.java b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManager.java index 37b22d3..b0b4591 100644 --- a/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManager.java +++ b/workflow/src/main/java/org/apache/oodt/cas/workflow/system/XmlRpcWorkflowManager.java @@ -30,6 +30,9 @@ import org.apache.oodt.cas.workflow.structs.exceptions.EngineException; import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException; import org.apache.oodt.cas.workflow.structs.exceptions.RepositoryException; import org.apache.oodt.cas.workflow.util.XmlRpcStructFactory; +import org.apache.oodt.config.Component; +import org.apache.oodt.config.ConfigurationManager; +import org.apache.oodt.config.ConfigurationManagerFactory; import org.apache.xmlrpc.WebServer; import com.google.common.base.Preconditions; @@ -41,6 +44,7 @@ import java.net.InetAddress; import java.net.MalformedURLException; import java.net.URL; import java.net.UnknownHostException; +import java.util.ArrayList; import java.util.Hashtable; import java.util.List; import java.util.Map; @@ -73,6 +77,8 @@ public class XmlRpcWorkflowManager { private final WorkflowEngine engine; private WorkflowRepository repo; + private ConfigurationManager configurationManager; + public XmlRpcWorkflowManager() { this(DEFAULT_WEB_SERVER_PORT); } @@ -80,6 +86,20 @@ public class XmlRpcWorkflowManager { public XmlRpcWorkflowManager(int port) { Preconditions.checkArgument(port > 0, "Must specify a port greater than 0"); + List<String> propertiesFiles = new ArrayList<>(); + String configFile = System.getProperty(PROPERTIES_FILE_PROPERTY); + if (configFile != null) { + propertiesFiles.add(configFile); + } + + configurationManager= ConfigurationManagerFactory.getConfigurationManager(Component.WORKFLOW_MANAGER,propertiesFiles); + try { + configurationManager.loadConfiguration(); + } catch (Exception e) { + LOG.log(Level.SEVERE, "Unable to load configuration", e); + throw new IllegalStateException("Unable to load configuration", e); + } + engine = getWorkflowEngineFromProperty(); engine.setWorkflowManagerUrl(safeGetUrlFromString("http://" + getHostname() + ":" + port)); @@ -95,6 +115,8 @@ public class XmlRpcWorkflowManager { } public boolean shutdown() { + configurationManager.clearConfiguration(); + if (webServer != null) { webServer.shutdown(); webServer = null; @@ -645,7 +667,6 @@ public class XmlRpcWorkflowManager { System.exit(1); } - loadProperties(); new XmlRpcWorkflowManager(portNum); for (; ; ) { @@ -656,17 +677,6 @@ public class XmlRpcWorkflowManager { } } - public static void loadProperties() throws IOException { - String configFile = System.getProperty(PROPERTIES_FILE_PROPERTY); - if (configFile != null) { - LOG.log(Level.INFO, - "Loading Workflow Manager Configuration Properties from: [" - + configFile + "]"); - System.getProperties().load(new FileInputStream(new File( - configFile))); - } - } - private static WorkflowEngine getWorkflowEngineFromProperty() { return getWorkflowEngineFromClassName(System.getProperty( WORKFLOW_ENGINE_FACTORY_PROPERTY, http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/workflow/src/test/java/org/apache/oodt/cas/workflow/system/distributed/TestDistributedXmlRpcWorkflowManager.java ---------------------------------------------------------------------- diff --git a/workflow/src/test/java/org/apache/oodt/cas/workflow/system/distributed/TestDistributedXmlRpcWorkflowManager.java b/workflow/src/test/java/org/apache/oodt/cas/workflow/system/distributed/TestDistributedXmlRpcWorkflowManager.java new file mode 100644 index 0000000..781a7eb --- /dev/null +++ b/workflow/src/test/java/org/apache/oodt/cas/workflow/system/distributed/TestDistributedXmlRpcWorkflowManager.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oodt.cas.workflow.system.distributed; + +import org.apache.oodt.cas.metadata.Metadata; +import org.apache.oodt.cas.metadata.util.PathUtils; +import org.apache.oodt.cas.workflow.system.XmlRpcWorkflowManager; +import org.apache.oodt.cas.workflow.system.XmlRpcWorkflowManagerClient; +import org.apache.oodt.config.distributed.cli.ConfigPublisher; +import org.apache.oodt.config.test.AbstractDistributedConfigurationTest; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.net.URL; +import java.util.List; + +import static org.apache.oodt.config.Constants.Properties.ENABLE_DISTRIBUTED_CONFIGURATION; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +public class TestDistributedXmlRpcWorkflowManager extends AbstractDistributedConfigurationTest { + + private static final int WM_PORT = 50002; + private static final String CONF_PUBLISHER_XML = "config/distributed/config-publisher.xml"; + + private XmlRpcWorkflowManager workflowManager; + + @Before + public void setUpTest() throws Exception { + System.setProperty("org.apache.oodt.cas.cli.action.spring.config", "../config/src/main/resources/cmd-line-actions.xml"); + System.setProperty("org.apache.oodt.cas.cli.option.spring.config", "../config/src/main/resources/cmd-line-options.xml"); + System.setProperty(ENABLE_DISTRIBUTED_CONFIGURATION, "true"); + + ConfigPublisher.main(new String[]{ + "-connectString", zookeeper.getConnectString(), + "-config", CONF_PUBLISHER_XML, + "-a", "publish" + }); + + try { + workflowManager = new XmlRpcWorkflowManager(WM_PORT); + } catch (Exception e) { + fail(e.getMessage()); + } + + startWorkflow(); + } + + private void startWorkflow() { + XmlRpcWorkflowManagerClient client = null; + try { + client = new XmlRpcWorkflowManagerClient(new URL("http://localhost:" + WM_PORT)); + } catch (Exception e) { + fail(e.getMessage()); + } + + try { + client.sendEvent("long", new Metadata()); + } catch (Exception e) { + throw new RuntimeException(e); + } + + } + + @Test + public void testGetWorkflowInstances() { + List workflowInsts = null; + int numInsts = -1; + while (numInsts != 2) { + try { + workflowInsts = workflowManager.getWorkflowInstances(); + } catch (Exception e) { + e.printStackTrace(); + } + + assertNotNull(workflowInsts); + numInsts = workflowInsts.size(); + } + + assertEquals(2, workflowInsts.size()); + } + + private void deleteAllFiles(String startDir) { + File startDirFile = new File(startDir); + File[] delFiles = startDirFile.listFiles(); + + if (delFiles != null && delFiles.length > 0) { + for (File delFile : delFiles) { + delFile.delete(); + } + } + + startDirFile.delete(); + } + + @After + public void tearDownTest() throws Exception { + if (workflowManager != null) { + workflowManager.shutdown(); + } + + ConfigPublisher.main(new String[]{ + "-connectString", zookeeper.getConnectString(), + "-config", CONF_PUBLISHER_XML, + "-a", "clear" + }); + + System.clearProperty("org.apache.oodt.cas.cli.action.spring.config"); + System.clearProperty("org.apache.oodt.cas.cli.option.spring.config"); + System.clearProperty(ENABLE_DISTRIBUTED_CONFIGURATION); + + String luceneIdx = System.getProperty("org.apache.oodt.cas.workflow.instanceRep.lucene.idxPath"); + if (luceneIdx != null) { + luceneIdx = PathUtils.replaceEnvVariables(luceneIdx); + deleteAllFiles(luceneIdx); + } + } +} http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/workflow/src/test/resources/config/distributed/config-publisher.xml ---------------------------------------------------------------------- diff --git a/workflow/src/test/resources/config/distributed/config-publisher.xml b/workflow/src/test/resources/config/distributed/config-publisher.xml new file mode 100644 index 0000000..f9b8115 --- /dev/null +++ b/workflow/src/test/resources/config/distributed/config-publisher.xml @@ -0,0 +1,56 @@ +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<beans xmlns="http://www.springframework.org/schema/beans" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd"> + + <bean id="workflowmgr-config-publisher" class="org.apache.oodt.config.distributed.DistributedConfigurationPublisher"> + <constructor-arg value="WORKFLOW_MANAGER"/> + <constructor-arg value="primary"/> + + <property name="propertiesFiles"> + <map key-type="java.lang.String" value-type="java.lang.String"> + <entry key="src/test/resources/config/distributed/workflow.properties" value="target/workflow/etc/resource.properties"/> + <entry key="src/test/resources/test.logging.properties" value="target/workflow/etc/logging.properties"/> + </map> + </property> + <property name="configFiles"> + <map key-type="java.lang.String" value-type="java.lang.String"> + <entry key="src/main/resources/examples/workflow-lifecycle.xml" value="target/workflow/policy/workflow-lifecycle.xml"/> + <entry key="src/main/resources/examples/workflow-instance-met.xml" value="target/workflow/policy/workflow-instance-met.xml"/> + <entry key="src/main/resources/examples/timeout.workflow.xml" value="target/workflow/policy/timeout.workflow.xml"/> + <entry key="src/main/resources/examples/testWorkflow.workflow.xml" value="target/workflow/policy/testWorkflow.workflow.xml"/> + <entry key="src/main/resources/examples/testStatusUpdate.workflow.xml" value="target/workflow/policy/testStatusUpdate.workflow.xml"/> + <entry key="src/main/resources/examples/testMetError.workflow.xml" value="target/workflow/policy/testMetError.workflow.xml"/> + <entry key="src/main/resources/examples/testMetadataUpdate.workflow.xml" value="target/workflow/policy/testMetadataUpdate.workflow.xml"/> + <entry key="src/main/resources/examples/tasks.xml" value="target/workflow/policy/tasks.xml"/> + <entry key="src/main/resources/examples/optional.workflow.xml" value="target/workflow/policy/optional.workflow.xml"/> + <entry key="src/main/resources/examples/mailWorkflow.workflow.xml" value="target/workflow/policy/mailWorkflow.workflow.xml"/> + <entry key="src/main/resources/examples/longWorkflow.workflow.xml" value="target/workflow/policy/longWorkflow.workflow.xml"/> + <entry key="src/main/resources/examples/externalScript.workflow.xml" value="target/workflow/policy/externalScript.workflow.xml"/> + <entry key="src/main/resources/examples/events.xml" value="target/workflow/policy/events.xml"/> + <entry key="src/main/resources/examples/conditions.xml" value="target/workflow/policy/conditions.xml"/> + <entry key="src/main/resources/examples/condition.workflow.xml" value="target/workflow/policy/condition.workflow.xml"/> + + <entry key="src/main/resources/examples/wengine/GranuleMaps.xml" value="target/workflow/policy/wengine/condition.workflow.xml"/> + <entry key="src/main/resources/examples/wengine/hello-goodbye.xml" value="target/workflow/policy/wengine/hello-goodbye.xml"/> + <entry key="src/main/resources/examples/wengine/wengine-lifecycle.xml" value="target/workflow/policy/wengine/wengine-lifecycle.xml"/> + </map> + </property> + </bean> +</beans> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oodt/blob/fc6311db/workflow/src/test/resources/config/distributed/workflow.properties ---------------------------------------------------------------------- diff --git a/workflow/src/test/resources/config/distributed/workflow.properties b/workflow/src/test/resources/config/distributed/workflow.properties new file mode 100644 index 0000000..ced0b21 --- /dev/null +++ b/workflow/src/test/resources/config/distributed/workflow.properties @@ -0,0 +1,87 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# workflow repository factory +workflow.repo.factory = org.apache.oodt.cas.workflow.repository.XMLWorkflowRepositoryFactory + +# workflow engine factory +workflow.engine.factory = org.apache.oodt.cas.workflow.engine.ThreadPoolWorkflowEngineFactory + +# workflow instance repository factory +workflow.engine.instanceRep.factory = org.apache.oodt.cas.workflow.instrepo.LuceneWorkflowInstanceRepositoryFactory + +# engine runner factory +workflow.wengine.runner.factory=org.apache.oodt.cas.workflow.engine.runner.AsynchronousLocalEngineRunnerFactory + +# thread pool workflow engine properties +org.apache.oodt.cas.workflow.engine.queueSize= +org.apache.oodt.cas.workflow.engine.maxPoolSize= +org.apache.oodt.cas.workflow.engine.minPoolSize=6 +org.apache.oodt.cas.workflow.engine.threadKeepAlive.minutes=5 +org.apache.oodt.cas.workflow.engine.unlimitedQueue=true +org.apache.oodt.cas.workflow.engine.preConditionWaitTime=10 + +# set this if you want the workflow manager to submit jobs through the resource mgr +org.apache.oodt.cas.workflow.engine.resourcemgr.url= + +# if you use the resource mgr submission, you can specify how many seconds the +# workflow manager should wait inbetween checking to see if a job is complete +org.apache.oodt.cas.workflow.engine.resourcemgr.pollingWaitTime=10 + +# wengine properties +# define workflow prioritizer class to use for sorting workflow tasks +org.apache.oodt.cas.workflow.wengine.prioritizer=org.apache.oodt.cas.workflow.structs.FILOPrioritySorter +# the default amount of time (in seconds) that the task querier waits before dispositioning processors +org.apache.oodt.cas.workflow.wengine.taskquerier.waitSeconds=2 + +# the maximum number of threads to be used by the asynchronous engine runner +org.apache.oodt.cas.workflow.wengine.asynchronous.runner.num.threads=25 + +# workflow instance repository general properties +# default page size to page through WorkflowInstances with +org.apache.oodt.cas.workflow.instanceRep.pageSize=20 + +# lucene workflow instance repository properties +org.apache.oodt.cas.workflow.instanceRep.lucene.idxPath=[WORKFLOW_HOME]/target/tmp + +# data source workflow instance repository properties +org.apache.oodt.cas.workflow.instanceRep.datasource.jdbc.url=jdbc:url +org.apache.oodt.cas.workflow.instanceRep.datasource.jdbc.user=user +org.apache.oodt.cas.workflow.instanceRep.datasource.jdbc.pass=pass +org.apache.oodt.cas.workflow.instanceRep.datasource.jdbc.driver=your.jdbc.Driver +org.apache.oodt.cas.workflow.instanceRep.datasource.quoteFields=false + +# XML workflow repository properties +org.apache.oodt.cas.workflow.repo.dirs=file://[WORKFLOW_HOME]/target/workflow/policy +# uncomment the following line and set the value to true to recursively parse repository directories +#org.apache.oodt.cas.workflow.repo.dirs.recursive=false + +# wengine-style packaged workflow repo properties +org.apache.oodt.cas.workflow.wengine.packagedRepo.dir.path = /path/to/wengine/workflow/files + +# data source workflow repository properties +org.apache.oodt.cas.workflow.repo.datasource.jdbc.url=jdbc:url +org.apache.oodt.cas.workflow.repo.datasource.jdbc.user=user +org.apache.oodt.cas.workflow.repo.datasource.jdbc.pass=pass +org.apache.oodt.cas.workflow.repo.datasource.jdbc.driver=your.jdbc.Driver + +# Spring command line option and action store properties +#org.apache.oodt.cas.cli.action.spring.config=src/main/resources/cmd-line-actions.xml +#org.apache.oodt.cas.cli.option.spring.config=src/main/resources/cmd-line-options.xml + +# workflow lifecycle Manager +org.apache.oodt.cas.workflow.lifecycle.filePath=[WORKFLOW_HOME]/target/workflow/policy/workflow-lifecycle.xml