Author: bfoster
Date: Wed Mar 14 01:05:41 2012
New Revision: 1300432
URL: http://svn.apache.org/viewvc?rev=1300432&view=rev
Log:
Added CAS-PGE support for multiple Property Adders
-----------
OODT-406
Added:
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java
(with props)
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
(with props)
Modified:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetadataKeys.java
Modified:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java?rev=1300432&r1=1300431&r2=1300432&view=diff
==============================================================================
--- oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
(original)
+++ oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
Wed Mar 14 01:05:41 2012
@@ -15,9 +15,12 @@
* limitations under the License.
*/
-
package org.apache.oodt.cas.pge;
+//OODT static imports
+import static
org.apache.oodt.cas.pge.metadata.PgeTaskMetadataKeys.PROPERTY_ADDERS;
+import static
org.apache.oodt.cas.pge.metadata.PgeTaskMetadataKeys.PROPERTY_ADDER_CLASSPATH;
+
//JDK imports
import java.util.LinkedList;
import java.util.List;
@@ -62,409 +65,437 @@ import org.apache.oodt.cas.pge.writers.S
//Spring imports
import org.springframework.context.support.FileSystemXmlApplicationContext;
+import com.google.common.collect.Lists;
/**
- *
- * @author mattmann
- * @author bfoster
- * @version $Revision$
- *
- * <p>
* Runs a CAS-style Product Generation Executive based on the PCS Wrapper
- * Architecture from mattmann et al. on OCO
- * </p>.
+ * Architecture from mattmann et al. on OCO.
+ *
+ * @author mattmann (Chris Mattmann)
+ * @author bfoster (Brian Foster)
*/
public class PGETaskInstance implements WorkflowTaskInstance {
-
- protected static Logger LOG =
Logger.getLogger(PGETaskInstance.class.getName());
-
- protected XmlRpcWorkflowManagerClient wm;
-
- protected String workflowInstId;
-
- protected PgeMetadata pgeMetadata;
-
- protected PgeConfig pgeConfig;
-
- protected PGETaskInstance() {}
-
- protected void initialize(Metadata metadata,
- WorkflowTaskConfiguration config) throws InstantiationException {
- try {
- // merge metadata
- this.pgeMetadata = createPgeMetadata(metadata, config);
-
- // create PgeConfig
- this.pgeConfig = this.createPgeConfig();
-
- // load property adder
- String propertyAdderClasspath = this.pgeMetadata
- .getMetadata(PgeTaskMetadataKeys.PROPERTY_ADDER_CLASSPATH);
- if (propertyAdderClasspath != null
- && !propertyAdderClasspath.equals(""))
- this.runPropertyAdder(this
- .loadPropertyAdder(propertyAdderClasspath));
-
- // configure workflow manager
- wm = new XmlRpcWorkflowManagerClient(new URL(this.pgeMetadata
- .getMetadata(PcsMetadataKeys.WORKFLOW_MANAGER_URL)));
- workflowInstId = this.pgeMetadata
- .getMetadata(CoreMetKeys.WORKFLOW_INST_ID);
- } catch (Exception e) {
- e.printStackTrace();
- throw new InstantiationException(
- "Failed to instanciate PGETaskInstance : " +
e.getMessage());
- }
- }
-
- protected PgeMetadata createPgeMetadata(Metadata dynMetadata,
- WorkflowTaskConfiguration config) {
- Metadata staticMetadata = new Metadata();
- for (Object key : config.getProperties().keySet()) {
- staticMetadata.addMetadata((String) key,
- config.getProperty((String) key));
- }
- return new PgeMetadata(staticMetadata, dynMetadata);
- }
-
- protected ConfigFilePropertyAdder loadPropertyAdder(
- String propertyAdderClasspath) throws Exception {
- return (ConfigFilePropertyAdder) Class.forName(propertyAdderClasspath)
- .newInstance();
- }
-
- protected void runPropertyAdder(ConfigFilePropertyAdder propAdder) {
- propAdder.addConfigProperties(this.pgeMetadata, this.pgeConfig
- .getPropertyAdderCustomArgs());
- }
-
- protected PgeConfig createPgeConfig() throws Exception {
- return new XmlFilePgeConfigBuilder().build(this.pgeMetadata);
- }
-
- protected void updateStatus(String status) {
- try {
- LOG.log(Level.INFO, "Updating status to workflow as " + status);
- this.wm.updateWorkflowInstanceStatus(this.workflowInstId, status);
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Failed to update to workflow as " + status
- + " : " + e.getMessage());
- }
- }
-
- protected void prePgeSetup() throws Exception {
- this.createExeDir();
- this.createOuputDirsIfRequested();
- this.createSciPgeConfigFiles();
- }
-
- protected void createExeDir() {
- LOG.log(Level.INFO, "Creating PGE execution working directory: ["
- + this.pgeConfig.getExeDir() + "]");
- new File(this.pgeConfig.getExeDir()).mkdirs();
- }
-
- protected void createOuputDirsIfRequested() {
- for (OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
- if (outputDir.isCreateBeforeExe()) {
- LOG.log(Level.INFO, "Creating PGE file ouput directory: ["
- + outputDir.getPath() + "]");
- new File(outputDir.getPath()).mkdirs();
- }
- }
- }
+ protected static Logger LOG = Logger.getLogger(PGETaskInstance.class
+ .getName());
- protected void createSciPgeConfigFiles() throws IOException {
- this.updateStatus(PgeTaskMetadataKeys.CONF_FILE_BUILD);
- for (DynamicConfigFile dynamicConfigFile : pgeConfig
- .getDynamicConfigFiles()) {
- try {
- this.createSciPgeConfigFile(dynamicConfigFile);
- } catch (Exception e) {
- e.printStackTrace();
- throw new IOException(
- "Failed to created pge input config file ' "
- + dynamicConfigFile.getFilePath() + "' : "
- + e.getMessage());
- }
- }
- }
+ protected XmlRpcWorkflowManagerClient wm;
- protected void createSciPgeConfigFile(DynamicConfigFile dynamicConfigFile)
- throws Exception {
- File parentDir = new File(dynamicConfigFile.getFilePath())
- .getParentFile();
- if (!parentDir.exists())
- parentDir.mkdirs();
- SciPgeConfigFileWriter writer = (SciPgeConfigFileWriter) Class.forName(
- dynamicConfigFile.getWriterClass()).newInstance();
- writer.createConfigFile(dynamicConfigFile.getFilePath(),
- this.pgeMetadata.asMetadata(), dynamicConfigFile.getArgs());
- }
-
- protected void processOutput() throws FileNotFoundException, IOException {
- for (final OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
- File[] createdFiles = new File(outputDir.getPath()).listFiles();
- for (File createdFile : createdFiles) {
- Metadata outputMetadata = new Metadata();
- for (RegExprOutputFiles regExprFiles : outputDir
- .getRegExprOutputFiles()) {
- if (Pattern.matches(regExprFiles.getRegExp(), createdFile
- .getName())) {
- try {
- PcsMetFileWriter writer = (PcsMetFileWriter) Class
- .forName(regExprFiles.getConverterClass())
- .newInstance();
-
outputMetadata.replaceMetadata(this.getMetadataForFile(
-
(regExprFiles.getRenamingConv() != null)
- ? createdFile =
this.renameFile(createdFile, regExprFiles.getRenamingConv())
- : createdFile,
writer, regExprFiles.getArgs()));
- } catch (Exception e) {
- LOG.log(Level.SEVERE,
- "Failed to create metadata file for '"
- + createdFile + "' : "
- + e.getMessage(), e);
- }
- }
- }
- if (outputMetadata.getAllKeys().size() > 0)
- this.writeFromMetadata(outputMetadata,
createdFile.getAbsolutePath()
- + "." +
this.pgeMetadata.getMetadata(PcsMetadataKeys.MET_FILE_EXT));
- }
- }
- }
+ protected String workflowInstId;
- protected File renameFile(File file, RenamingConv renamingConv) throws
Exception {
- Metadata curMetadata = this.pgeMetadata.asMetadata();
- curMetadata.replaceMetadata(renamingConv.getTmpReplaceMet());
- String newFileName =
PathUtils.doDynamicReplacement(renamingConv.getRenamingString(), curMetadata);
- File newFile = new File(file.getParentFile(), newFileName);
- LOG.log(Level.INFO, "Renaming file '" + file.getAbsolutePath()
- + "' to '" + newFile.getAbsolutePath() + "'");
- if (!file.renameTo(newFile))
- throw new IOException("Renaming returned false");
- return newFile;
- }
-
- protected Metadata getMetadataForFile(File sciPgeCreatedDataFile,
- PcsMetFileWriter writer, Object[] args) throws Exception {
- return writer.getMetadataForFile(sciPgeCreatedDataFile,
this.pgeMetadata,
- args);
- }
-
- protected void writeFromMetadata(Metadata metadata, String toMetFilePath)
- throws FileNotFoundException, IOException {
- new SerializableMetadata(metadata, "UTF-8", false)
- .writeMetadataToXmlStream(new
FileOutputStream(toMetFilePath));
- }
-
- protected ScriptFile buildPgeRunScript() {
- ScriptFile sf = new ScriptFile(this.pgeConfig.getShellType());
- sf.setCommands(this.pgeConfig.getExeCmds());
- return sf;
- }
-
- protected String getScriptPath() {
- return new File(this.pgeConfig.getExeDir()).getAbsolutePath() + "/"
- + this.getPgeScriptName();
- }
-
- protected String getPgeScriptName() {
- return "sciPgeExeScript_"
- + this.pgeMetadata.getMetadata(PgeTaskMetadataKeys.NAME);
- }
-
- protected Handler initializePgeLogHandler() throws SecurityException,
- IOException {
- FileHandler handler = null;
- String logFilePattern = this.pgeMetadata
-
.getMetadata(PgeTaskMetadataKeys.LOG_FILE_PATTERN);
- if (logFilePattern != null) {
- LOG.log(Level.INFO, "Creating Log Handler to capture
pge output to file '"
- + logFilePattern + "'");
- new File(logFilePattern).getParentFile().mkdirs();
- handler = new FileHandler(logFilePattern);
- handler.setFormatter(new SimpleFormatter());
-
- }
- return handler;
- }
-
- protected Logger initializePgeLogger(Handler handler) {
- if (handler != null) {
- Logger pgeLogger = Logger.getLogger(this.pgeMetadata
- .getMetadata(PgeTaskMetadataKeys.NAME)
- + System.currentTimeMillis());
- pgeLogger.addHandler(handler);
- return pgeLogger;
- }else {
- return LOG;
- }
- }
-
- protected void closePgeLogHandler(Logger logger, Handler handler) {
- if (logger != null && handler != null) {
- logger.removeHandler(handler);
- handler.close();
- }
- }
-
- protected void runPge() throws Exception {
- ScriptFile sf = null;
- Handler handler = null;
- Logger pgeLogger = null;
- try {
- long startTime = System.currentTimeMillis();
-
- // create script to run
- sf = this.buildPgeRunScript();
- sf.writeScriptFile(this.getScriptPath());
-
- // run script and evaluate whether success or failure
- handler = this.initializePgeLogHandler();
- pgeLogger = this.initializePgeLogger(handler);
- this.updateStatus(PgeTaskMetadataKeys.RUNNING_PGE);
- if (!this.wasPgeSuccessful(ExecUtils.callProgram(this.pgeConfig
- .getShellType()
- + " " + this.getScriptPath(), pgeLogger, new
File(this.pgeConfig
- .getExeDir()).getAbsoluteFile())))
- throw new RuntimeException("Pge didn't finish successfully");
- else
- LOG.log(Level.INFO, "Successfully completed running: '"
- + sf.getCommands() + "'");
-
-
- long endTime = System.currentTimeMillis();
- this.pgeMetadata.replaceMetadata(PgeTaskMetadataKeys.PGE_RUNTIME,
- (endTime - startTime) + "");
+ protected PgeMetadata pgeMetadata;
- } catch (Exception e) {
+ protected PgeConfig pgeConfig;
+
+ protected PGETaskInstance() {
+ }
+
+ protected void initialize(Metadata metadata, WorkflowTaskConfiguration
config)
+ throws InstantiationException {
+ try {
+ // merge metadata
+ this.pgeMetadata = createPgeMetadata(metadata, config);
+
+ // create PgeConfig
+ this.pgeConfig = this.createPgeConfig();
+
+ // run Property Adders.
+ runPropertyAdders();
+
+ // configure workflow manager
+ wm = new XmlRpcWorkflowManagerClient(new URL(
+ this.pgeMetadata
+ .getMetadata(PcsMetadataKeys.WORKFLOW_MANAGER_URL)));
+ workflowInstId = this.pgeMetadata
+ .getMetadata(CoreMetKeys.WORKFLOW_INST_ID);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new InstantiationException(
+ "Failed to instanciate PGETaskInstance : " + e.getMessage());
+ }
+ }
+
+ protected PgeMetadata createPgeMetadata(Metadata dynMetadata,
+ WorkflowTaskConfiguration config) {
+ Metadata staticMetadata = new Metadata();
+ for (Object key : config.getProperties().keySet()) {
+ if (PROPERTY_ADDERS.equals((String) key)
+ || PROPERTY_ADDER_CLASSPATH.equals((String) key)) {
+ staticMetadata.addMetadata((String) key,
+ Lists.newArrayList(config.getProperty((String)
key).split(",")));
+ } else {
+ staticMetadata.addMetadata((String) key,
+ config.getProperty((String) key));
+ }
+ }
+ return new PgeMetadata(staticMetadata, dynMetadata);
+ }
+
+ protected void runPropertyAdders() throws Exception {
+ try {
+ List<String> propertyAdders = pgeMetadata
+ .getAllMetadata(PROPERTY_ADDERS);
+ if (propertyAdders == null) {
+ // Left for backwards compatibility.
+ propertyAdders = pgeMetadata
+ .getAllMetadata(PROPERTY_ADDER_CLASSPATH);
+ }
+ if (propertyAdders != null) {
+ for (String propertyAdder : propertyAdders) {
+ runPropertyAdder(loadPropertyAdder(propertyAdder));
+ }
+ }
+ } catch (Exception e) {
+ throw new Exception("Failed to instanciate/run Property Adders : "
+ + e.getMessage(), e);
+ }
+ }
+
+ protected ConfigFilePropertyAdder loadPropertyAdder(
+ String propertyAdderClasspath) throws Exception {
+ return (ConfigFilePropertyAdder) Class.forName(propertyAdderClasspath)
+ .newInstance();
+ }
+
+ protected void runPropertyAdder(ConfigFilePropertyAdder propAdder) {
+ propAdder.addConfigProperties(this.pgeMetadata,
+ this.pgeConfig.getPropertyAdderCustomArgs());
+ }
+
+ protected PgeConfig createPgeConfig() throws Exception {
+ return new XmlFilePgeConfigBuilder().build(this.pgeMetadata);
+ }
+
+ protected void updateStatus(String status) {
+ try {
+ LOG.log(Level.INFO, "Updating status to workflow as " + status);
+ this.wm.updateWorkflowInstanceStatus(this.workflowInstId, status);
+ } catch (Exception e) {
+ LOG.log(Level.WARNING, "Failed to update to workflow as " + status
+ + " : " + e.getMessage());
+ }
+ }
+
+ protected void prePgeSetup() throws Exception {
+ this.createExeDir();
+ this.createOuputDirsIfRequested();
+ this.createSciPgeConfigFiles();
+ }
+
+ protected void createExeDir() {
+ LOG.log(Level.INFO, "Creating PGE execution working directory: ["
+ + this.pgeConfig.getExeDir() + "]");
+ new File(this.pgeConfig.getExeDir()).mkdirs();
+ }
+
+ protected void createOuputDirsIfRequested() {
+ for (OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
+ if (outputDir.isCreateBeforeExe()) {
+ LOG.log(Level.INFO, "Creating PGE file ouput directory: ["
+ + outputDir.getPath() + "]");
+ new File(outputDir.getPath()).mkdirs();
+ }
+ }
+ }
+
+ protected void createSciPgeConfigFiles() throws IOException {
+ this.updateStatus(PgeTaskMetadataKeys.CONF_FILE_BUILD);
+ for (DynamicConfigFile dynamicConfigFile : pgeConfig
+ .getDynamicConfigFiles()) {
+ try {
+ this.createSciPgeConfigFile(dynamicConfigFile);
+ } catch (Exception e) {
e.printStackTrace();
- throw new Exception("Exception when executing PGE commands '"
- + (sf != null ? sf.getCommands() : "NULL") + "' : "
- + e.getMessage());
- }finally {
- this.closePgeLogHandler(pgeLogger, handler);
- }
- }
-
- protected boolean wasPgeSuccessful(int returnCode) {
- return returnCode == 0;
- }
-
- protected void ingestProducts() throws Exception {
- StdProductCrawler crawler = new StdProductCrawler();
- this.setCrawlerConfigurations(crawler);
- this.runIngestCrawler(crawler, this.getOutputDirs());
- }
-
- protected List<File> getOutputDirs() {
- List<File> outputDirs = new LinkedList<File>();
- for (OutputDir outputDir : pgeConfig.getOuputDirs())
- outputDirs.add(new File(outputDir.getPath()));
- return outputDirs;
- }
-
- protected void setCrawlerConfigurations(StdProductCrawler crawler)
- throws Exception {
- crawler.setMetFileExtension(this.pgeMetadata
- .getMetadata(PcsMetadataKeys.MET_FILE_EXT));
- crawler
- .setClientTransferer(this.pgeMetadata
-
.getMetadata(PcsMetadataKeys.CLIENT_TRANSFER_SERVICE_FACTORY));
- crawler.setFilemgrUrl(this.pgeMetadata
- .getMetadata(PcsMetadataKeys.FILE_MANAGER_URL));
- String actionRepoFile = this.pgeMetadata
- .getMetadata(PcsMetadataKeys.ACTION_REPO_FILE);
- if (actionRepoFile != null && !actionRepoFile.equals("")) {
- crawler.setApplicationContext(new FileSystemXmlApplicationContext(
- actionRepoFile));
- crawler.setActionIds(this.pgeMetadata
- .getAllMetadata(PcsMetadataKeys.ACTION_IDS));
- }
- crawler.setRequiredMetadata(this.pgeMetadata
- .getAllMetadata(PcsMetadataKeys.REQUIRED_METADATA));
- String crawlForDirsString = this.pgeMetadata
- .getMetadata(PcsMetadataKeys.CRAWLER_CRAWL_FOR_DIRS);
- boolean crawlForDirs = (crawlForDirsString != null) ?
crawlForDirsString
- .toLowerCase().equals("true")
- : false;
- String recurString = this.pgeMetadata
- .getMetadata(PcsMetadataKeys.CRAWLER_RECUR);
- boolean recur = (recurString != null) ? recurString.toLowerCase()
- .equals("true") : true;
- crawler.setCrawlForDirs(crawlForDirs);
- crawler.setNoRecur(!recur);
- LOG.log(Level.INFO, "Passing Workflow Metadata to CAS-Crawler as global
metadata . . .");
-
crawler.setGlobalMetadata(this.pgeMetadata.asMetadata(PgeMetadata.Type.DYNAMIC));
- }
-
- protected void runIngestCrawler(StdProductCrawler crawler,
- List<File> crawlDirs) {
- File currentDir = null;
- try {
- this.updateStatus(PgeTaskMetadataKeys.CRAWLING);
- boolean attemptIngestAll =
Boolean.parseBoolean(this.pgeMetadata
-
.getMetadata(PgeTaskMetadataKeys.ATTEMPT_INGEST_ALL));
- for (File crawlDir : crawlDirs) {
- currentDir = crawlDir;
- LOG.log(Level.INFO,
- "Executing StdProductCrawler in
productPath: ["
- + crawlDir +
"]");
- crawler.crawl(crawlDir);
- if (!attemptIngestAll)
- this.verifyIngests(crawler);
- }
- if (attemptIngestAll)
- this.verifyIngests(crawler);
- } catch (Exception e) {
- LOG.log(Level.WARNING,
- "Failed while attempting to ingest
products while crawling directory '"
- + currentDir
- + "' (all products may
not have been ingested) : "
- + e.getMessage(), e);
- }
- }
-
- protected void verifyIngests(ProductCrawler crawler) throws Exception {
- boolean ingestsSuccess = true;
- String exceptionMsg = "";
- for (IngestStatus status : crawler.getIngestStatus()) {
- if (status.getResult().equals(IngestStatus.Result.FAILURE)) {
- exceptionMsg += (exceptionMsg.equals("") ? "" : " : ")
- + "Failed to ingest product
[file='"
- +
status.getProduct().getAbsolutePath() + "',result='"
- + status.getResult() +
"',msg='" + status.getMessage()
- + "']";
- ingestsSuccess = false;
- }else if
(!status.getResult().equals(IngestStatus.Result.SUCCESS)) {
- LOG.log(Level.WARNING, "Product was not ingested [file='"
- +
status.getProduct().getAbsolutePath() + "',result='"
- + status.getResult() +
"',msg='" + status.getMessage()
- + "']");
- }
- }
- if (!ingestsSuccess)
- throw new Exception(exceptionMsg);
- }
-
- protected void updateDynamicMetadata() {
- this.pgeMetadata.commitMarkedDynamicMetadataKeys();
- }
-
- public void run(Metadata metadata, WorkflowTaskConfiguration config)
- throws WorkflowTaskInstanceException {
- try {
- this.initialize(metadata, config);
- this.prePgeSetup();
- this.runPge();
- this.processOutput();
- this.updateDynamicMetadata();
- this.ingestProducts();
- } catch (Exception e) {
- throw new WorkflowTaskInstanceException("PGETask failed : "
- + e.getMessage(), e);
- }
- }
+ throw new IOException("Failed to created pge input config file ' "
+ + dynamicConfigFile.getFilePath() + "' : " + e.getMessage());
+ }
+ }
+ }
+
+ protected void createSciPgeConfigFile(DynamicConfigFile dynamicConfigFile)
+ throws Exception {
+ File parentDir = new File(dynamicConfigFile.getFilePath())
+ .getParentFile();
+ if (!parentDir.exists())
+ parentDir.mkdirs();
+ SciPgeConfigFileWriter writer = (SciPgeConfigFileWriter) Class.forName(
+ dynamicConfigFile.getWriterClass()).newInstance();
+ writer.createConfigFile(dynamicConfigFile.getFilePath(),
+ this.pgeMetadata.asMetadata(), dynamicConfigFile.getArgs());
+ }
+
+ protected void processOutput() throws FileNotFoundException, IOException {
+ for (final OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
+ File[] createdFiles = new File(outputDir.getPath()).listFiles();
+ for (File createdFile : createdFiles) {
+ Metadata outputMetadata = new Metadata();
+ for (RegExprOutputFiles regExprFiles : outputDir
+ .getRegExprOutputFiles()) {
+ if (Pattern.matches(regExprFiles.getRegExp(),
+ createdFile.getName())) {
+ try {
+ PcsMetFileWriter writer = (PcsMetFileWriter) Class
+ .forName(regExprFiles.getConverterClass())
+ .newInstance();
+ outputMetadata
+ .replaceMetadata(this.getMetadataForFile(
+ (regExprFiles.getRenamingConv() != null) ?
createdFile = this
+ .renameFile(createdFile,
+ regExprFiles.getRenamingConv())
+ : createdFile, writer, regExprFiles
+ .getArgs()));
+ } catch (Exception e) {
+ LOG.log(Level.SEVERE,
+ "Failed to create metadata file for '" + createdFile
+ + "' : " + e.getMessage(), e);
+ }
+ }
+ }
+ if (outputMetadata.getAllKeys().size() > 0)
+ this.writeFromMetadata(
+ outputMetadata,
+ createdFile.getAbsolutePath()
+ + "."
+ + this.pgeMetadata
+ .getMetadata(PcsMetadataKeys.MET_FILE_EXT));
+ }
+ }
+ }
+
+ protected File renameFile(File file, RenamingConv renamingConv)
+ throws Exception {
+ Metadata curMetadata = this.pgeMetadata.asMetadata();
+ curMetadata.replaceMetadata(renamingConv.getTmpReplaceMet());
+ String newFileName = PathUtils.doDynamicReplacement(
+ renamingConv.getRenamingString(), curMetadata);
+ File newFile = new File(file.getParentFile(), newFileName);
+ LOG.log(Level.INFO, "Renaming file '" + file.getAbsolutePath() + "' to '"
+ + newFile.getAbsolutePath() + "'");
+ if (!file.renameTo(newFile))
+ throw new IOException("Renaming returned false");
+ return newFile;
+ }
+
+ protected Metadata getMetadataForFile(File sciPgeCreatedDataFile,
+ PcsMetFileWriter writer, Object[] args) throws Exception {
+ return writer.getMetadataForFile(sciPgeCreatedDataFile, this.pgeMetadata,
+ args);
+ }
+
+ protected void writeFromMetadata(Metadata metadata, String toMetFilePath)
+ throws FileNotFoundException, IOException {
+ new SerializableMetadata(metadata, "UTF-8", false)
+ .writeMetadataToXmlStream(new FileOutputStream(toMetFilePath));
+ }
+
+ protected ScriptFile buildPgeRunScript() {
+ ScriptFile sf = new ScriptFile(this.pgeConfig.getShellType());
+ sf.setCommands(this.pgeConfig.getExeCmds());
+ return sf;
+ }
+
+ protected String getScriptPath() {
+ return new File(this.pgeConfig.getExeDir()).getAbsolutePath() + "/"
+ + this.getPgeScriptName();
+ }
+
+ protected String getPgeScriptName() {
+ return "sciPgeExeScript_"
+ + this.pgeMetadata.getMetadata(PgeTaskMetadataKeys.NAME);
+ }
+
+ protected Handler initializePgeLogHandler() throws SecurityException,
+ IOException {
+ FileHandler handler = null;
+ String logFilePattern = this.pgeMetadata
+ .getMetadata(PgeTaskMetadataKeys.LOG_FILE_PATTERN);
+ if (logFilePattern != null) {
+ LOG.log(Level.INFO,
+ "Creating Log Handler to capture pge output to file '"
+ + logFilePattern + "'");
+ new File(logFilePattern).getParentFile().mkdirs();
+ handler = new FileHandler(logFilePattern);
+ handler.setFormatter(new SimpleFormatter());
+
+ }
+ return handler;
+ }
+
+ protected Logger initializePgeLogger(Handler handler) {
+ if (handler != null) {
+ Logger pgeLogger = Logger.getLogger(this.pgeMetadata
+ .getMetadata(PgeTaskMetadataKeys.NAME)
+ + System.currentTimeMillis());
+ pgeLogger.addHandler(handler);
+ return pgeLogger;
+ } else {
+ return LOG;
+ }
+ }
+
+ protected void closePgeLogHandler(Logger logger, Handler handler) {
+ if (logger != null && handler != null) {
+ logger.removeHandler(handler);
+ handler.close();
+ }
+ }
+
+ protected void runPge() throws Exception {
+ ScriptFile sf = null;
+ Handler handler = null;
+ Logger pgeLogger = null;
+ try {
+ long startTime = System.currentTimeMillis();
+
+ // create script to run
+ sf = this.buildPgeRunScript();
+ sf.writeScriptFile(this.getScriptPath());
+
+ // run script and evaluate whether success or failure
+ handler = this.initializePgeLogHandler();
+ pgeLogger = this.initializePgeLogger(handler);
+ this.updateStatus(PgeTaskMetadataKeys.RUNNING_PGE);
+ if (!this.wasPgeSuccessful(ExecUtils.callProgram(
+ this.pgeConfig.getShellType() + " " + this.getScriptPath(),
+ pgeLogger,
+ new File(this.pgeConfig.getExeDir()).getAbsoluteFile())))
+ throw new RuntimeException("Pge didn't finish successfully");
+ else
+ LOG.log(Level.INFO,
+ "Successfully completed running: '" + sf.getCommands() +
"'");
+
+ long endTime = System.currentTimeMillis();
+ this.pgeMetadata.replaceMetadata(PgeTaskMetadataKeys.PGE_RUNTIME,
+ (endTime - startTime) + "");
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ throw new Exception("Exception when executing PGE commands '"
+ + (sf != null ? sf.getCommands() : "NULL") + "' : "
+ + e.getMessage());
+ } finally {
+ this.closePgeLogHandler(pgeLogger, handler);
+ }
+ }
+
+ protected boolean wasPgeSuccessful(int returnCode) {
+ return returnCode == 0;
+ }
+
+ protected void ingestProducts() throws Exception {
+ StdProductCrawler crawler = new StdProductCrawler();
+ this.setCrawlerConfigurations(crawler);
+ this.runIngestCrawler(crawler, this.getOutputDirs());
+ }
+
+ protected List<File> getOutputDirs() {
+ List<File> outputDirs = new LinkedList<File>();
+ for (OutputDir outputDir : pgeConfig.getOuputDirs())
+ outputDirs.add(new File(outputDir.getPath()));
+ return outputDirs;
+ }
+
+ protected void setCrawlerConfigurations(StdProductCrawler crawler)
+ throws Exception {
+ crawler.setMetFileExtension(this.pgeMetadata
+ .getMetadata(PcsMetadataKeys.MET_FILE_EXT));
+ crawler.setClientTransferer(this.pgeMetadata
+ .getMetadata(PcsMetadataKeys.CLIENT_TRANSFER_SERVICE_FACTORY));
+ crawler.setFilemgrUrl(this.pgeMetadata
+ .getMetadata(PcsMetadataKeys.FILE_MANAGER_URL));
+ String actionRepoFile = this.pgeMetadata
+ .getMetadata(PcsMetadataKeys.ACTION_REPO_FILE);
+ if (actionRepoFile != null && !actionRepoFile.equals("")) {
+ crawler.setApplicationContext(new FileSystemXmlApplicationContext(
+ actionRepoFile));
+ List<String> actionIds = pgeMetadata
+ .getAllMetadata(PcsMetadataKeys.ACTION_IDS);
+ if (actionIds != null) {
+ crawler.setActionIds(actionIds);
+ }
+ }
+ crawler.setRequiredMetadata(this.pgeMetadata
+ .getAllMetadata(PcsMetadataKeys.REQUIRED_METADATA));
+ String crawlForDirsString = this.pgeMetadata
+ .getMetadata(PcsMetadataKeys.CRAWLER_CRAWL_FOR_DIRS);
+ boolean crawlForDirs = (crawlForDirsString != null) ? crawlForDirsString
+ .toLowerCase().equals("true") : false;
+ String recurString = this.pgeMetadata
+ .getMetadata(PcsMetadataKeys.CRAWLER_RECUR);
+ boolean recur = (recurString != null) ? recurString.toLowerCase().equals(
+ "true") : true;
+ crawler.setCrawlForDirs(crawlForDirs);
+ crawler.setNoRecur(!recur);
+ LOG.log(Level.INFO,
+ "Passing Workflow Metadata to CAS-Crawler as global metadata . .
.");
+ crawler.setGlobalMetadata(this.pgeMetadata
+ .asMetadata(PgeMetadata.Type.DYNAMIC));
+ }
+
+ protected void runIngestCrawler(StdProductCrawler crawler,
+ List<File> crawlDirs) {
+ File currentDir = null;
+ try {
+ this.updateStatus(PgeTaskMetadataKeys.CRAWLING);
+ boolean attemptIngestAll = Boolean.parseBoolean(this.pgeMetadata
+ .getMetadata(PgeTaskMetadataKeys.ATTEMPT_INGEST_ALL));
+ for (File crawlDir : crawlDirs) {
+ currentDir = crawlDir;
+ LOG.log(Level.INFO, "Executing StdProductCrawler in productPath: ["
+ + crawlDir + "]");
+ crawler.crawl(crawlDir);
+ if (!attemptIngestAll)
+ this.verifyIngests(crawler);
+ }
+ if (attemptIngestAll)
+ this.verifyIngests(crawler);
+ } catch (Exception e) {
+ LOG.log(
+ Level.WARNING,
+ "Failed while attempting to ingest products while crawling
directory '"
+ + currentDir
+ + "' (all products may not have been ingested) : "
+ + e.getMessage(), e);
+ }
+ }
+
+ protected void verifyIngests(ProductCrawler crawler) throws Exception {
+ boolean ingestsSuccess = true;
+ String exceptionMsg = "";
+ for (IngestStatus status : crawler.getIngestStatus()) {
+ if (status.getResult().equals(IngestStatus.Result.FAILURE)) {
+ exceptionMsg += (exceptionMsg.equals("") ? "" : " : ")
+ + "Failed to ingest product [file='"
+ + status.getProduct().getAbsolutePath() + "',result='"
+ + status.getResult() + "',msg='" + status.getMessage() +
"']";
+ ingestsSuccess = false;
+ } else if (!status.getResult().equals(IngestStatus.Result.SUCCESS)) {
+ LOG.log(Level.WARNING, "Product was not ingested [file='"
+ + status.getProduct().getAbsolutePath() + "',result='"
+ + status.getResult() + "',msg='" + status.getMessage() +
"']");
+ }
+ }
+ if (!ingestsSuccess)
+ throw new Exception(exceptionMsg);
+ }
+
+ protected void updateDynamicMetadata() {
+ this.pgeMetadata.commitMarkedDynamicMetadataKeys();
+ }
+
+ public void run(Metadata metadata, WorkflowTaskConfiguration config)
+ throws WorkflowTaskInstanceException {
+ try {
+ this.initialize(metadata, config);
+ this.prePgeSetup();
+ this.runPge();
+ this.processOutput();
+ this.updateDynamicMetadata();
+ this.ingestProducts();
+ } catch (Exception e) {
+ throw new WorkflowTaskInstanceException("PGETask failed : "
+ + e.getMessage(), e);
+ }
+ }
}
Modified:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java?rev=1300432&r1=1300431&r2=1300432&view=diff
==============================================================================
---
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java
(original)
+++
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java
Wed Mar 14 01:05:41 2012
@@ -215,7 +215,10 @@ public class XmlFilePgeConfigBuilder imp
.toLowerCase().equals("true"))
localPgeMetadata.markAsDynamicMetadataKey(key);
- curPlusLocalMetadata.replaceMetadata(key,
curPgeMetadata.getAllMetadata(key));
+ List<String> values = curPgeMetadata.getAllMetadata(key);
+ if (values != null) {
+ curPlusLocalMetadata.replaceMetadata(key, values);
+ }
}
}
return localPgeMetadata;
Modified:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java?rev=1300432&r1=1300431&r2=1300432&view=diff
==============================================================================
---
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
(original)
+++
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
Wed Mar 14 01:05:41 2012
@@ -396,7 +396,10 @@ public class PgeMetadata {
case LOCAL:
combinedMetadata.replaceMetadata(localMetadata);
for (String key : keyLinkMap.keySet()) {
- combinedMetadata.replaceMetadata(key, getAllMetadata(key));
+ List<String> values = getAllMetadata(key);
+ if (values != null) {
+ combinedMetadata.replaceMetadata(key, values);
+ }
}
break;
}
@@ -446,7 +449,7 @@ public class PgeMetadata {
break;
}
}
- return Lists.newArrayList();
+ return null;
}
/**
@@ -455,6 +458,6 @@ public class PgeMetadata {
*/
public String getMetadata(String key, Type... types) {
List<String> values = getAllMetadata(key, types);
- return (values.size() > 0) ? values.get(0) : null;
+ return values != null ? values.get(0) : null;
}
}
Modified:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetadataKeys.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetadataKeys.java?rev=1300432&r1=1300431&r2=1300432&view=diff
==============================================================================
---
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetadataKeys.java
(original)
+++
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetadataKeys.java
Wed Mar 14 01:05:41 2012
@@ -14,33 +14,39 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-
package org.apache.oodt.cas.pge.metadata;
/**
- *
- * @author bfoster
- * @version $Revision$
+ * PGETaskInstance Reserved Metadata keys.
*
- * <p>Describe your class here</p>.
+ * @author bfoster (Brian Foster)
*/
public interface PgeTaskMetadataKeys {
public static final String NAME = "PGETask_Name";
+ /** @deprecated Never used. */
+ @Deprecated
public static final String SCI_EXE_PATH = "PGETask_SciExe_Path";
+ /** @deprecated Never used. */
+ @Deprecated
public static final String SCI_EXE_VERSION = "PGETask_SciExe_Version";
+ /** @deprecated Never used. */
+ @Deprecated
public static final String PRODUCT_PATH = "PGETask_ProductPath";
public static final String CONFIG_FILE_PATH = "PGETask_ConfigFilePath";
public static final String LOG_FILE_PATTERN = "PGETask_LogFilePattern";
+ /** @deprecated Use {@link #PROPERTY_ADDERS} instead. */
+ @Deprecated
public static final String PROPERTY_ADDER_CLASSPATH =
"PGETask_PropertyAdderClasspath";
+ public static final String PROPERTY_ADDERS = "PGETask_PropertyAdders";
+
public static final String PGE_RUNTIME = "PGETask_Runtime";
public static final String ATTEMPT_INGEST_ALL = "PGETask_AttemptIngestAll";
Added:
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java?rev=1300432&view=auto
==============================================================================
---
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java
(added)
+++
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java
Wed Mar 14 01:05:41 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge;
+
+//OODT imports
+import org.apache.oodt.cas.pge.metadata.PgeMetadata;
+
+/**
+ * Mock implementation of {@link ConfigFilePropertyAdder}.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class MockConfigFilePropertyAdder implements ConfigFilePropertyAdder {
+
+ public static final String RUN_COUNTER =
"MockConfigFilePropertyAdder/RunCounter";
+
+ public void addConfigProperties(PgeMetadata metadata, Object... objs) {
+ String key = (String) objs[0];
+ String value = (String) objs[1];
+ metadata.replaceMetadata(key, value);
+ String runCounter = metadata.getMetadata(RUN_COUNTER);
+ if (runCounter == null) {
+ metadata.replaceMetadata(RUN_COUNTER, "1");
+ } else {
+ metadata.replaceMetadata(RUN_COUNTER,
Integer.toString(Integer.parseInt(runCounter) + 1));
+ }
+ }
+}
Propchange:
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added: oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java?rev=1300432&view=auto
==============================================================================
--- oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
(added)
+++ oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
Wed Mar 14 01:05:41 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge;
+
+//OODT static imports
+import static
org.apache.oodt.cas.pge.metadata.PgeTaskMetadataKeys.PROPERTY_ADDERS;
+import static
org.apache.oodt.cas.pge.metadata.PgeTaskMetadataKeys.PROPERTY_ADDER_CLASSPATH;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.pge.PGETaskInstance;
+import org.apache.oodt.cas.pge.config.PgeConfig;
+import org.apache.oodt.cas.pge.metadata.PgeMetadata;
+
+//Google imports
+import com.google.common.collect.Lists;
+
+//JUnit imports
+import junit.framework.TestCase;
+
+/**
+ * Test class for {@link PGETaskInstance}.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class TestPGETaskInstance extends TestCase {
+
+ public void testLoadPropertyAdders() throws Exception {
+ PGETaskInstance pgeTask = new PGETaskInstance();
+ ConfigFilePropertyAdder propAdder =
pgeTask.loadPropertyAdder(MockConfigFilePropertyAdder.class.getCanonicalName());
+ assertNotNull(propAdder);
+ assertTrue(propAdder instanceof MockConfigFilePropertyAdder);
+ }
+
+ public void testRunPropertyAdders() throws Exception {
+ PGETaskInstance pgeTask = new PGETaskInstance();
+ Metadata staticMet = new Metadata();
+ staticMet.addMetadata(PROPERTY_ADDER_CLASSPATH,
MockConfigFilePropertyAdder.class.getCanonicalName());
+ Metadata dynMet = new Metadata();
+ pgeTask.pgeMetadata = new PgeMetadata(staticMet, dynMet);
+ pgeTask.pgeConfig = new PgeConfig();
+ pgeTask.pgeConfig.setPropertyAdderCustomArgs(new Object[] { "key",
"value" });
+ pgeTask.runPropertyAdders();
+ assertEquals("value", pgeTask.pgeMetadata.getMetadata("key"));
+
+ staticMet = new Metadata();
+ dynMet = new Metadata();
+ dynMet.addMetadata(PROPERTY_ADDERS,
MockConfigFilePropertyAdder.class.getCanonicalName());
+ pgeTask.pgeMetadata = new PgeMetadata(staticMet, dynMet);
+ pgeTask.pgeConfig = new PgeConfig();
+ pgeTask.pgeConfig.setPropertyAdderCustomArgs(new Object[] { "key",
"value" });
+ pgeTask.runPropertyAdders();
+ assertEquals("value", pgeTask.pgeMetadata.getMetadata("key"));
+ assertEquals("1",
pgeTask.pgeMetadata.getMetadata(MockConfigFilePropertyAdder.RUN_COUNTER));
+
+ staticMet = new Metadata();
+ dynMet = new Metadata();
+ dynMet.addMetadata(PROPERTY_ADDERS,
Lists.newArrayList(MockConfigFilePropertyAdder.class.getCanonicalName(),
MockConfigFilePropertyAdder.class.getCanonicalName()));
+ pgeTask.pgeMetadata = new PgeMetadata(staticMet, dynMet);
+ pgeTask.pgeConfig = new PgeConfig();
+ pgeTask.pgeConfig.setPropertyAdderCustomArgs(new Object[] { "key",
"value" });
+ pgeTask.runPropertyAdders();
+ assertEquals("value", pgeTask.pgeMetadata.getMetadata("key"));
+ assertEquals("2",
pgeTask.pgeMetadata.getMetadata(MockConfigFilePropertyAdder.RUN_COUNTER));
+ }
+}
Propchange:
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
------------------------------------------------------------------------------
svn:mime-type = text/plain