Author: bfoster
Date: Wed Mar 14 01:05:41 2012
New Revision: 1300432

URL: http://svn.apache.org/viewvc?rev=1300432&view=rev
Log:
Added CAS-PGE support for multiple Property Adders

-----------
OODT-406

Added:
    
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java
   (with props)
    oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java   
(with props)
Modified:
    oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
    
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java
    
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
    
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetadataKeys.java

Modified: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java?rev=1300432&r1=1300431&r2=1300432&view=diff
==============================================================================
--- oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java 
(original)
+++ oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java 
Wed Mar 14 01:05:41 2012
@@ -15,9 +15,12 @@
  * limitations under the License.
  */
 
-
 package org.apache.oodt.cas.pge;
 
+//OODT static imports
+import static 
org.apache.oodt.cas.pge.metadata.PgeTaskMetadataKeys.PROPERTY_ADDERS;
+import static 
org.apache.oodt.cas.pge.metadata.PgeTaskMetadataKeys.PROPERTY_ADDER_CLASSPATH;
+
 //JDK imports
 import java.util.LinkedList;
 import java.util.List;
@@ -62,409 +65,437 @@ import org.apache.oodt.cas.pge.writers.S
 //Spring imports
 import org.springframework.context.support.FileSystemXmlApplicationContext;
 
+import com.google.common.collect.Lists;
 
 /**
- * 
- * @author mattmann
- * @author bfoster
- * @version $Revision$
- * 
- * <p>
  * Runs a CAS-style Product Generation Executive based on the PCS Wrapper
- * Architecture from mattmann et al. on OCO
- * </p>.
+ * Architecture from mattmann et al. on OCO.
+ * 
+ * @author mattmann (Chris Mattmann)
+ * @author bfoster (Brian Foster)
  */
 public class PGETaskInstance implements WorkflowTaskInstance {
-    
-    protected static Logger LOG = 
Logger.getLogger(PGETaskInstance.class.getName());
-    
-    protected XmlRpcWorkflowManagerClient wm;
-
-    protected String workflowInstId;
-
-    protected PgeMetadata pgeMetadata;
-
-    protected PgeConfig pgeConfig;
-    
-    protected PGETaskInstance() {}
-
-    protected void initialize(Metadata metadata,
-            WorkflowTaskConfiguration config) throws InstantiationException {
-        try {
-            // merge metadata
-            this.pgeMetadata = createPgeMetadata(metadata, config);
-            
-            // create PgeConfig
-            this.pgeConfig = this.createPgeConfig();
-
-            // load property adder
-            String propertyAdderClasspath = this.pgeMetadata
-                    .getMetadata(PgeTaskMetadataKeys.PROPERTY_ADDER_CLASSPATH);
-            if (propertyAdderClasspath != null
-                    && !propertyAdderClasspath.equals(""))
-                this.runPropertyAdder(this
-                        .loadPropertyAdder(propertyAdderClasspath));
-
-            // configure workflow manager
-            wm = new XmlRpcWorkflowManagerClient(new URL(this.pgeMetadata
-                    .getMetadata(PcsMetadataKeys.WORKFLOW_MANAGER_URL)));
-            workflowInstId = this.pgeMetadata
-                    .getMetadata(CoreMetKeys.WORKFLOW_INST_ID);
 
-        } catch (Exception e) {
-            e.printStackTrace();
-            throw new InstantiationException(
-                    "Failed to instanciate PGETaskInstance : " + 
e.getMessage());
-        }
-    }
-
-    protected PgeMetadata createPgeMetadata(Metadata dynMetadata,
-          WorkflowTaskConfiguration config) {
-       Metadata staticMetadata = new Metadata();
-       for (Object key : config.getProperties().keySet()) {
-          staticMetadata.addMetadata((String) key,
-                config.getProperty((String) key));
-       }
-       return new PgeMetadata(staticMetadata, dynMetadata);
-    }
-
-    protected ConfigFilePropertyAdder loadPropertyAdder(
-            String propertyAdderClasspath) throws Exception {
-        return (ConfigFilePropertyAdder) Class.forName(propertyAdderClasspath)
-                .newInstance();
-    }
-
-    protected void runPropertyAdder(ConfigFilePropertyAdder propAdder) {
-        propAdder.addConfigProperties(this.pgeMetadata, this.pgeConfig
-                .getPropertyAdderCustomArgs());
-    }
-
-    protected PgeConfig createPgeConfig() throws Exception {
-        return new XmlFilePgeConfigBuilder().build(this.pgeMetadata);
-    }
-
-    protected void updateStatus(String status) {
-        try {
-            LOG.log(Level.INFO, "Updating status to workflow as " + status);
-            this.wm.updateWorkflowInstanceStatus(this.workflowInstId, status);
-        } catch (Exception e) {
-            LOG.log(Level.WARNING, "Failed to update to workflow as " + status 
-                    + " : " + e.getMessage());
-        }
-    }
-
-    protected void prePgeSetup() throws Exception {
-        this.createExeDir();
-        this.createOuputDirsIfRequested();
-        this.createSciPgeConfigFiles();
-    }
-
-    protected void createExeDir() {
-        LOG.log(Level.INFO, "Creating PGE execution working directory: ["
-                + this.pgeConfig.getExeDir() + "]");
-        new File(this.pgeConfig.getExeDir()).mkdirs();
-    }
-
-    protected void createOuputDirsIfRequested() {
-        for (OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
-            if (outputDir.isCreateBeforeExe()) {
-                LOG.log(Level.INFO, "Creating PGE file ouput directory: ["
-                        + outputDir.getPath() + "]");
-                new File(outputDir.getPath()).mkdirs();
-            }
-        }
-    }
+   protected static Logger LOG = Logger.getLogger(PGETaskInstance.class
+         .getName());
 
-    protected void createSciPgeConfigFiles() throws IOException {
-        this.updateStatus(PgeTaskMetadataKeys.CONF_FILE_BUILD);
-        for (DynamicConfigFile dynamicConfigFile : pgeConfig
-                .getDynamicConfigFiles()) {
-            try {
-                this.createSciPgeConfigFile(dynamicConfigFile);
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw new IOException(
-                        "Failed to created pge input config file ' "
-                                + dynamicConfigFile.getFilePath() + "' : "
-                                + e.getMessage());
-            }
-        }
-    }
+   protected XmlRpcWorkflowManagerClient wm;
 
-    protected void createSciPgeConfigFile(DynamicConfigFile dynamicConfigFile)
-            throws Exception {
-        File parentDir = new File(dynamicConfigFile.getFilePath())
-                .getParentFile();
-        if (!parentDir.exists())
-            parentDir.mkdirs();
-        SciPgeConfigFileWriter writer = (SciPgeConfigFileWriter) Class.forName(
-                dynamicConfigFile.getWriterClass()).newInstance();
-        writer.createConfigFile(dynamicConfigFile.getFilePath(),
-                this.pgeMetadata.asMetadata(), dynamicConfigFile.getArgs());
-    }
-
-    protected void processOutput() throws FileNotFoundException, IOException {
-        for (final OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
-            File[] createdFiles = new File(outputDir.getPath()).listFiles();
-            for (File createdFile : createdFiles) {
-                Metadata outputMetadata = new Metadata();
-                for (RegExprOutputFiles regExprFiles : outputDir
-                        .getRegExprOutputFiles()) {
-                    if (Pattern.matches(regExprFiles.getRegExp(), createdFile
-                            .getName())) {
-                        try {
-                            PcsMetFileWriter writer = (PcsMetFileWriter) Class
-                                    .forName(regExprFiles.getConverterClass())
-                                    .newInstance();
-                            
outputMetadata.replaceMetadata(this.getMetadataForFile(
-                                                       
(regExprFiles.getRenamingConv() != null) 
-                                                           ? createdFile = 
this.renameFile(createdFile, regExprFiles.getRenamingConv())
-                                                           : createdFile, 
writer, regExprFiles.getArgs()));
-                        } catch (Exception e) {
-                            LOG.log(Level.SEVERE,
-                                    "Failed to create metadata file for '"
-                                            + createdFile + "' : "
-                                            + e.getMessage(), e);
-                        }
-                    }
-                }
-                if (outputMetadata.getAllKeys().size() > 0)
-                       this.writeFromMetadata(outputMetadata, 
createdFile.getAbsolutePath() 
-                                       + "." + 
this.pgeMetadata.getMetadata(PcsMetadataKeys.MET_FILE_EXT));
-            }
-        }
-    }
+   protected String workflowInstId;
 
-    protected File renameFile(File file, RenamingConv renamingConv) throws 
Exception {
-       Metadata curMetadata = this.pgeMetadata.asMetadata();
-       curMetadata.replaceMetadata(renamingConv.getTmpReplaceMet());
-       String newFileName = 
PathUtils.doDynamicReplacement(renamingConv.getRenamingString(), curMetadata);
-       File newFile = new File(file.getParentFile(), newFileName);
-        LOG.log(Level.INFO, "Renaming file '" + file.getAbsolutePath() 
-                       + "' to '" + newFile.getAbsolutePath() + "'");
-       if (!file.renameTo(newFile))
-               throw new IOException("Renaming returned false");
-       return newFile;
-    }
-    
-    protected Metadata getMetadataForFile(File sciPgeCreatedDataFile,
-            PcsMetFileWriter writer, Object[] args) throws Exception {
-        return writer.getMetadataForFile(sciPgeCreatedDataFile, 
this.pgeMetadata,
-                args);
-    }
-    
-    protected void writeFromMetadata(Metadata metadata, String toMetFilePath) 
-               throws FileNotFoundException, IOException {
-               new SerializableMetadata(metadata, "UTF-8", false)
-                               .writeMetadataToXmlStream(new 
FileOutputStream(toMetFilePath));
-       }
-
-    protected ScriptFile buildPgeRunScript() {
-        ScriptFile sf = new ScriptFile(this.pgeConfig.getShellType());
-        sf.setCommands(this.pgeConfig.getExeCmds());
-        return sf;
-    }
-
-    protected String getScriptPath() {
-        return new File(this.pgeConfig.getExeDir()).getAbsolutePath() + "/"
-                + this.getPgeScriptName();
-    }
-
-    protected String getPgeScriptName() {
-        return "sciPgeExeScript_"
-                + this.pgeMetadata.getMetadata(PgeTaskMetadataKeys.NAME);
-    }
-
-    protected Handler initializePgeLogHandler() throws SecurityException,
-                       IOException {
-       FileHandler handler = null;
-               String logFilePattern = this.pgeMetadata
-                               
.getMetadata(PgeTaskMetadataKeys.LOG_FILE_PATTERN);
-               if (logFilePattern != null) {
-                       LOG.log(Level.INFO, "Creating Log Handler to capture 
pge output to file '"
-                                                       + logFilePattern + "'");
-                       new File(logFilePattern).getParentFile().mkdirs();
-                       handler = new FileHandler(logFilePattern);
-                       handler.setFormatter(new SimpleFormatter());
-
-               }
-               return handler;
-       }
-    
-    protected Logger initializePgeLogger(Handler handler) {
-       if (handler != null) {
-               Logger pgeLogger = Logger.getLogger(this.pgeMetadata
-                                       .getMetadata(PgeTaskMetadataKeys.NAME)
-                                       + System.currentTimeMillis());
-                       pgeLogger.addHandler(handler);
-                       return pgeLogger;
-       }else {
-               return LOG;
-       }
-    }
-    
-    protected void closePgeLogHandler(Logger logger, Handler handler) {
-       if (logger != null && handler != null) {
-               logger.removeHandler(handler);
-               handler.close();
-       }
-    }
-    
-    protected void runPge() throws Exception {
-        ScriptFile sf = null;
-        Handler handler = null;
-        Logger pgeLogger = null;
-        try {
-            long startTime = System.currentTimeMillis();
-
-            // create script to run
-            sf = this.buildPgeRunScript();
-            sf.writeScriptFile(this.getScriptPath());
-
-            // run script and evaluate whether success or failure
-            handler = this.initializePgeLogHandler();
-            pgeLogger = this.initializePgeLogger(handler);
-            this.updateStatus(PgeTaskMetadataKeys.RUNNING_PGE);
-            if (!this.wasPgeSuccessful(ExecUtils.callProgram(this.pgeConfig
-                    .getShellType()
-                    + " " + this.getScriptPath(), pgeLogger, new 
File(this.pgeConfig
-                    .getExeDir()).getAbsoluteFile())))
-                throw new RuntimeException("Pge didn't finish successfully");
-            else
-                LOG.log(Level.INFO, "Successfully completed running: '"
-                        + sf.getCommands() + "'");
-           
-            
-            long endTime = System.currentTimeMillis();
-            this.pgeMetadata.replaceMetadata(PgeTaskMetadataKeys.PGE_RUNTIME,
-                    (endTime - startTime) + "");
+   protected PgeMetadata pgeMetadata;
 
-        } catch (Exception e) {
+   protected PgeConfig pgeConfig;
+
+   protected PGETaskInstance() {
+   }
+
+   protected void initialize(Metadata metadata, WorkflowTaskConfiguration 
config)
+         throws InstantiationException {
+      try {
+         // merge metadata
+         this.pgeMetadata = createPgeMetadata(metadata, config);
+
+         // create PgeConfig
+         this.pgeConfig = this.createPgeConfig();
+
+         // run Property Adders.
+         runPropertyAdders();
+
+         // configure workflow manager
+         wm = new XmlRpcWorkflowManagerClient(new URL(
+               this.pgeMetadata
+                     .getMetadata(PcsMetadataKeys.WORKFLOW_MANAGER_URL)));
+         workflowInstId = this.pgeMetadata
+               .getMetadata(CoreMetKeys.WORKFLOW_INST_ID);
+
+      } catch (Exception e) {
+         e.printStackTrace();
+         throw new InstantiationException(
+               "Failed to instanciate PGETaskInstance : " + e.getMessage());
+      }
+   }
+
+   protected PgeMetadata createPgeMetadata(Metadata dynMetadata,
+         WorkflowTaskConfiguration config) {
+      Metadata staticMetadata = new Metadata();
+      for (Object key : config.getProperties().keySet()) {
+         if (PROPERTY_ADDERS.equals((String) key)
+               || PROPERTY_ADDER_CLASSPATH.equals((String) key)) {
+            staticMetadata.addMetadata((String) key,
+                  Lists.newArrayList(config.getProperty((String) 
key).split(",")));            
+         } else {
+            staticMetadata.addMetadata((String) key,
+                  config.getProperty((String) key));
+         }
+      }
+      return new PgeMetadata(staticMetadata, dynMetadata);
+   }
+
+   protected void runPropertyAdders() throws Exception {
+      try {
+         List<String> propertyAdders = pgeMetadata
+               .getAllMetadata(PROPERTY_ADDERS);
+         if (propertyAdders == null) {
+            // Left for backwards compatibility.
+            propertyAdders = pgeMetadata
+                  .getAllMetadata(PROPERTY_ADDER_CLASSPATH);
+         }
+         if (propertyAdders != null) {
+            for (String propertyAdder : propertyAdders) {
+               runPropertyAdder(loadPropertyAdder(propertyAdder));
+            }
+         }
+      } catch (Exception e) {
+         throw new Exception("Failed to instanciate/run Property Adders : "
+               + e.getMessage(), e);
+      }
+   }
+
+   protected ConfigFilePropertyAdder loadPropertyAdder(
+         String propertyAdderClasspath) throws Exception {
+      return (ConfigFilePropertyAdder) Class.forName(propertyAdderClasspath)
+            .newInstance();
+   }
+
+   protected void runPropertyAdder(ConfigFilePropertyAdder propAdder) {
+      propAdder.addConfigProperties(this.pgeMetadata,
+            this.pgeConfig.getPropertyAdderCustomArgs());
+   }
+
+   protected PgeConfig createPgeConfig() throws Exception {
+      return new XmlFilePgeConfigBuilder().build(this.pgeMetadata);
+   }
+
+   protected void updateStatus(String status) {
+      try {
+         LOG.log(Level.INFO, "Updating status to workflow as " + status);
+         this.wm.updateWorkflowInstanceStatus(this.workflowInstId, status);
+      } catch (Exception e) {
+         LOG.log(Level.WARNING, "Failed to update to workflow as " + status
+               + " : " + e.getMessage());
+      }
+   }
+
+   protected void prePgeSetup() throws Exception {
+      this.createExeDir();
+      this.createOuputDirsIfRequested();
+      this.createSciPgeConfigFiles();
+   }
+
+   protected void createExeDir() {
+      LOG.log(Level.INFO, "Creating PGE execution working directory: ["
+            + this.pgeConfig.getExeDir() + "]");
+      new File(this.pgeConfig.getExeDir()).mkdirs();
+   }
+
+   protected void createOuputDirsIfRequested() {
+      for (OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
+         if (outputDir.isCreateBeforeExe()) {
+            LOG.log(Level.INFO, "Creating PGE file ouput directory: ["
+                  + outputDir.getPath() + "]");
+            new File(outputDir.getPath()).mkdirs();
+         }
+      }
+   }
+
+   protected void createSciPgeConfigFiles() throws IOException {
+      this.updateStatus(PgeTaskMetadataKeys.CONF_FILE_BUILD);
+      for (DynamicConfigFile dynamicConfigFile : pgeConfig
+            .getDynamicConfigFiles()) {
+         try {
+            this.createSciPgeConfigFile(dynamicConfigFile);
+         } catch (Exception e) {
             e.printStackTrace();
-            throw new Exception("Exception when executing PGE commands '"
-                    + (sf != null ? sf.getCommands() : "NULL") + "' : "
-                    + e.getMessage());
-        }finally {
-               this.closePgeLogHandler(pgeLogger, handler);
-        }
-    }
-
-    protected boolean wasPgeSuccessful(int returnCode) {
-        return returnCode == 0;
-    }
-
-    protected void ingestProducts() throws Exception {
-        StdProductCrawler crawler = new StdProductCrawler();
-        this.setCrawlerConfigurations(crawler);
-        this.runIngestCrawler(crawler, this.getOutputDirs());
-    }
-
-    protected List<File> getOutputDirs() {
-        List<File> outputDirs = new LinkedList<File>();
-        for (OutputDir outputDir : pgeConfig.getOuputDirs())
-            outputDirs.add(new File(outputDir.getPath()));
-        return outputDirs;
-    }
-
-    protected void setCrawlerConfigurations(StdProductCrawler crawler)
-            throws Exception {
-        crawler.setMetFileExtension(this.pgeMetadata
-                .getMetadata(PcsMetadataKeys.MET_FILE_EXT));
-        crawler
-                .setClientTransferer(this.pgeMetadata
-                        
.getMetadata(PcsMetadataKeys.CLIENT_TRANSFER_SERVICE_FACTORY));
-        crawler.setFilemgrUrl(this.pgeMetadata
-                .getMetadata(PcsMetadataKeys.FILE_MANAGER_URL));
-        String actionRepoFile = this.pgeMetadata
-                .getMetadata(PcsMetadataKeys.ACTION_REPO_FILE);
-        if (actionRepoFile != null && !actionRepoFile.equals("")) {
-            crawler.setApplicationContext(new FileSystemXmlApplicationContext(
-                    actionRepoFile));
-            crawler.setActionIds(this.pgeMetadata
-                    .getAllMetadata(PcsMetadataKeys.ACTION_IDS));
-        }
-        crawler.setRequiredMetadata(this.pgeMetadata
-                .getAllMetadata(PcsMetadataKeys.REQUIRED_METADATA));
-        String crawlForDirsString = this.pgeMetadata
-                .getMetadata(PcsMetadataKeys.CRAWLER_CRAWL_FOR_DIRS);
-        boolean crawlForDirs = (crawlForDirsString != null) ? 
crawlForDirsString
-                .toLowerCase().equals("true")
-                : false;
-        String recurString = this.pgeMetadata
-                .getMetadata(PcsMetadataKeys.CRAWLER_RECUR);
-        boolean recur = (recurString != null) ? recurString.toLowerCase()
-                .equals("true") : true;
-        crawler.setCrawlForDirs(crawlForDirs);
-        crawler.setNoRecur(!recur);
-       LOG.log(Level.INFO, "Passing Workflow Metadata to CAS-Crawler as global 
metadata . . .");
-       
crawler.setGlobalMetadata(this.pgeMetadata.asMetadata(PgeMetadata.Type.DYNAMIC));
-    }
-
-    protected void runIngestCrawler(StdProductCrawler crawler,
-            List<File> crawlDirs) {
-       File currentDir = null;
-               try {
-                       this.updateStatus(PgeTaskMetadataKeys.CRAWLING);
-                       boolean attemptIngestAll = 
Boolean.parseBoolean(this.pgeMetadata
-                                       
.getMetadata(PgeTaskMetadataKeys.ATTEMPT_INGEST_ALL));
-                       for (File crawlDir : crawlDirs) {
-                               currentDir = crawlDir;
-                               LOG.log(Level.INFO,
-                                               "Executing StdProductCrawler in 
productPath: ["
-                                                               + crawlDir + 
"]");
-                               crawler.crawl(crawlDir);
-                               if (!attemptIngestAll)
-                                       this.verifyIngests(crawler);
-                       }
-                       if (attemptIngestAll)
-                               this.verifyIngests(crawler);
-               } catch (Exception e) {
-                       LOG.log(Level.WARNING,
-                                       "Failed while attempting to ingest 
products while crawling directory '"
-                                                       + currentDir
-                                                       + "' (all products may 
not have been ingested) : "
-                                                       + e.getMessage(), e);
-               }
-    }
-    
-    protected void verifyIngests(ProductCrawler crawler) throws Exception {
-       boolean ingestsSuccess = true;
-       String exceptionMsg = "";
-       for (IngestStatus status : crawler.getIngestStatus()) {
-               if (status.getResult().equals(IngestStatus.Result.FAILURE)) {
-                       exceptionMsg += (exceptionMsg.equals("") ? "" : " : ")
-                                               + "Failed to ingest product 
[file='"
-                                               + 
status.getProduct().getAbsolutePath() + "',result='"
-                                               + status.getResult() + 
"',msg='" + status.getMessage()
-                                               + "']";
-                       ingestsSuccess = false;
-               }else if 
(!status.getResult().equals(IngestStatus.Result.SUCCESS)) {
-                LOG.log(Level.WARNING, "Product was not ingested [file='"
-                                               + 
status.getProduct().getAbsolutePath() + "',result='"
-                                               + status.getResult() + 
"',msg='" + status.getMessage()
-                                               + "']");
-               }
-       }
-       if (!ingestsSuccess)
-               throw new Exception(exceptionMsg);
-    }
-    
-    protected void updateDynamicMetadata() {
-        this.pgeMetadata.commitMarkedDynamicMetadataKeys();
-    }
-
-    public void run(Metadata metadata, WorkflowTaskConfiguration config)
-            throws WorkflowTaskInstanceException {
-        try {
-            this.initialize(metadata, config);
-            this.prePgeSetup();
-            this.runPge();
-            this.processOutput();
-            this.updateDynamicMetadata();
-            this.ingestProducts();
-        } catch (Exception e) {
-            throw new WorkflowTaskInstanceException("PGETask failed : "
-                    + e.getMessage(), e);
-        }
-    }
+            throw new IOException("Failed to created pge input config file ' "
+                  + dynamicConfigFile.getFilePath() + "' : " + e.getMessage());
+         }
+      }
+   }
+
+   protected void createSciPgeConfigFile(DynamicConfigFile dynamicConfigFile)
+         throws Exception {
+      File parentDir = new File(dynamicConfigFile.getFilePath())
+            .getParentFile();
+      if (!parentDir.exists())
+         parentDir.mkdirs();
+      SciPgeConfigFileWriter writer = (SciPgeConfigFileWriter) Class.forName(
+            dynamicConfigFile.getWriterClass()).newInstance();
+      writer.createConfigFile(dynamicConfigFile.getFilePath(),
+            this.pgeMetadata.asMetadata(), dynamicConfigFile.getArgs());
+   }
+
+   protected void processOutput() throws FileNotFoundException, IOException {
+      for (final OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
+         File[] createdFiles = new File(outputDir.getPath()).listFiles();
+         for (File createdFile : createdFiles) {
+            Metadata outputMetadata = new Metadata();
+            for (RegExprOutputFiles regExprFiles : outputDir
+                  .getRegExprOutputFiles()) {
+               if (Pattern.matches(regExprFiles.getRegExp(),
+                     createdFile.getName())) {
+                  try {
+                     PcsMetFileWriter writer = (PcsMetFileWriter) Class
+                           .forName(regExprFiles.getConverterClass())
+                           .newInstance();
+                     outputMetadata
+                           .replaceMetadata(this.getMetadataForFile(
+                                 (regExprFiles.getRenamingConv() != null) ? 
createdFile = this
+                                       .renameFile(createdFile,
+                                             regExprFiles.getRenamingConv())
+                                       : createdFile, writer, regExprFiles
+                                       .getArgs()));
+                  } catch (Exception e) {
+                     LOG.log(Level.SEVERE,
+                           "Failed to create metadata file for '" + createdFile
+                                 + "' : " + e.getMessage(), e);
+                  }
+               }
+            }
+            if (outputMetadata.getAllKeys().size() > 0)
+               this.writeFromMetadata(
+                     outputMetadata,
+                     createdFile.getAbsolutePath()
+                           + "."
+                           + this.pgeMetadata
+                                 .getMetadata(PcsMetadataKeys.MET_FILE_EXT));
+         }
+      }
+   }
+
+   protected File renameFile(File file, RenamingConv renamingConv)
+         throws Exception {
+      Metadata curMetadata = this.pgeMetadata.asMetadata();
+      curMetadata.replaceMetadata(renamingConv.getTmpReplaceMet());
+      String newFileName = PathUtils.doDynamicReplacement(
+            renamingConv.getRenamingString(), curMetadata);
+      File newFile = new File(file.getParentFile(), newFileName);
+      LOG.log(Level.INFO, "Renaming file '" + file.getAbsolutePath() + "' to '"
+            + newFile.getAbsolutePath() + "'");
+      if (!file.renameTo(newFile))
+         throw new IOException("Renaming returned false");
+      return newFile;
+   }
+
+   protected Metadata getMetadataForFile(File sciPgeCreatedDataFile,
+         PcsMetFileWriter writer, Object[] args) throws Exception {
+      return writer.getMetadataForFile(sciPgeCreatedDataFile, this.pgeMetadata,
+            args);
+   }
+
+   protected void writeFromMetadata(Metadata metadata, String toMetFilePath)
+         throws FileNotFoundException, IOException {
+      new SerializableMetadata(metadata, "UTF-8", false)
+            .writeMetadataToXmlStream(new FileOutputStream(toMetFilePath));
+   }
+
+   protected ScriptFile buildPgeRunScript() {
+      ScriptFile sf = new ScriptFile(this.pgeConfig.getShellType());
+      sf.setCommands(this.pgeConfig.getExeCmds());
+      return sf;
+   }
+
+   protected String getScriptPath() {
+      return new File(this.pgeConfig.getExeDir()).getAbsolutePath() + "/"
+            + this.getPgeScriptName();
+   }
+
+   protected String getPgeScriptName() {
+      return "sciPgeExeScript_"
+            + this.pgeMetadata.getMetadata(PgeTaskMetadataKeys.NAME);
+   }
+
+   protected Handler initializePgeLogHandler() throws SecurityException,
+         IOException {
+      FileHandler handler = null;
+      String logFilePattern = this.pgeMetadata
+            .getMetadata(PgeTaskMetadataKeys.LOG_FILE_PATTERN);
+      if (logFilePattern != null) {
+         LOG.log(Level.INFO,
+               "Creating Log Handler to capture pge output to file '"
+                     + logFilePattern + "'");
+         new File(logFilePattern).getParentFile().mkdirs();
+         handler = new FileHandler(logFilePattern);
+         handler.setFormatter(new SimpleFormatter());
+
+      }
+      return handler;
+   }
+
+   protected Logger initializePgeLogger(Handler handler) {
+      if (handler != null) {
+         Logger pgeLogger = Logger.getLogger(this.pgeMetadata
+               .getMetadata(PgeTaskMetadataKeys.NAME)
+               + System.currentTimeMillis());
+         pgeLogger.addHandler(handler);
+         return pgeLogger;
+      } else {
+         return LOG;
+      }
+   }
+
+   protected void closePgeLogHandler(Logger logger, Handler handler) {
+      if (logger != null && handler != null) {
+         logger.removeHandler(handler);
+         handler.close();
+      }
+   }
+
+   protected void runPge() throws Exception {
+      ScriptFile sf = null;
+      Handler handler = null;
+      Logger pgeLogger = null;
+      try {
+         long startTime = System.currentTimeMillis();
+
+         // create script to run
+         sf = this.buildPgeRunScript();
+         sf.writeScriptFile(this.getScriptPath());
+
+         // run script and evaluate whether success or failure
+         handler = this.initializePgeLogHandler();
+         pgeLogger = this.initializePgeLogger(handler);
+         this.updateStatus(PgeTaskMetadataKeys.RUNNING_PGE);
+         if (!this.wasPgeSuccessful(ExecUtils.callProgram(
+               this.pgeConfig.getShellType() + " " + this.getScriptPath(),
+               pgeLogger,
+               new File(this.pgeConfig.getExeDir()).getAbsoluteFile())))
+            throw new RuntimeException("Pge didn't finish successfully");
+         else
+            LOG.log(Level.INFO,
+                  "Successfully completed running: '" + sf.getCommands() + 
"'");
+
+         long endTime = System.currentTimeMillis();
+         this.pgeMetadata.replaceMetadata(PgeTaskMetadataKeys.PGE_RUNTIME,
+               (endTime - startTime) + "");
+
+      } catch (Exception e) {
+         e.printStackTrace();
+         throw new Exception("Exception when executing PGE commands '"
+               + (sf != null ? sf.getCommands() : "NULL") + "' : "
+               + e.getMessage());
+      } finally {
+         this.closePgeLogHandler(pgeLogger, handler);
+      }
+   }
+
+   protected boolean wasPgeSuccessful(int returnCode) {
+      return returnCode == 0;
+   }
+
+   protected void ingestProducts() throws Exception {
+      StdProductCrawler crawler = new StdProductCrawler();
+      this.setCrawlerConfigurations(crawler);
+      this.runIngestCrawler(crawler, this.getOutputDirs());
+   }
+
+   protected List<File> getOutputDirs() {
+      List<File> outputDirs = new LinkedList<File>();
+      for (OutputDir outputDir : pgeConfig.getOuputDirs())
+         outputDirs.add(new File(outputDir.getPath()));
+      return outputDirs;
+   }
+
+   protected void setCrawlerConfigurations(StdProductCrawler crawler)
+         throws Exception {
+      crawler.setMetFileExtension(this.pgeMetadata
+            .getMetadata(PcsMetadataKeys.MET_FILE_EXT));
+      crawler.setClientTransferer(this.pgeMetadata
+            .getMetadata(PcsMetadataKeys.CLIENT_TRANSFER_SERVICE_FACTORY));
+      crawler.setFilemgrUrl(this.pgeMetadata
+            .getMetadata(PcsMetadataKeys.FILE_MANAGER_URL));
+      String actionRepoFile = this.pgeMetadata
+            .getMetadata(PcsMetadataKeys.ACTION_REPO_FILE);
+      if (actionRepoFile != null && !actionRepoFile.equals("")) {
+         crawler.setApplicationContext(new FileSystemXmlApplicationContext(
+               actionRepoFile));
+         List<String> actionIds = pgeMetadata
+            .getAllMetadata(PcsMetadataKeys.ACTION_IDS);
+         if (actionIds != null) {
+            crawler.setActionIds(actionIds);
+         }
+      }
+      crawler.setRequiredMetadata(this.pgeMetadata
+            .getAllMetadata(PcsMetadataKeys.REQUIRED_METADATA));
+      String crawlForDirsString = this.pgeMetadata
+            .getMetadata(PcsMetadataKeys.CRAWLER_CRAWL_FOR_DIRS);
+      boolean crawlForDirs = (crawlForDirsString != null) ? crawlForDirsString
+            .toLowerCase().equals("true") : false;
+      String recurString = this.pgeMetadata
+            .getMetadata(PcsMetadataKeys.CRAWLER_RECUR);
+      boolean recur = (recurString != null) ? recurString.toLowerCase().equals(
+            "true") : true;
+      crawler.setCrawlForDirs(crawlForDirs);
+      crawler.setNoRecur(!recur);
+      LOG.log(Level.INFO,
+            "Passing Workflow Metadata to CAS-Crawler as global metadata . . 
.");
+      crawler.setGlobalMetadata(this.pgeMetadata
+            .asMetadata(PgeMetadata.Type.DYNAMIC));
+   }
+
+   protected void runIngestCrawler(StdProductCrawler crawler,
+         List<File> crawlDirs) {
+      File currentDir = null;
+      try {
+         this.updateStatus(PgeTaskMetadataKeys.CRAWLING);
+         boolean attemptIngestAll = Boolean.parseBoolean(this.pgeMetadata
+               .getMetadata(PgeTaskMetadataKeys.ATTEMPT_INGEST_ALL));
+         for (File crawlDir : crawlDirs) {
+            currentDir = crawlDir;
+            LOG.log(Level.INFO, "Executing StdProductCrawler in productPath: ["
+                  + crawlDir + "]");
+            crawler.crawl(crawlDir);
+            if (!attemptIngestAll)
+               this.verifyIngests(crawler);
+         }
+         if (attemptIngestAll)
+            this.verifyIngests(crawler);
+      } catch (Exception e) {
+         LOG.log(
+               Level.WARNING,
+               "Failed while attempting to ingest products while crawling 
directory '"
+                     + currentDir
+                     + "' (all products may not have been ingested) : "
+                     + e.getMessage(), e);
+      }
+   }
+
+   protected void verifyIngests(ProductCrawler crawler) throws Exception {
+      boolean ingestsSuccess = true;
+      String exceptionMsg = "";
+      for (IngestStatus status : crawler.getIngestStatus()) {
+         if (status.getResult().equals(IngestStatus.Result.FAILURE)) {
+            exceptionMsg += (exceptionMsg.equals("") ? "" : " : ")
+                  + "Failed to ingest product [file='"
+                  + status.getProduct().getAbsolutePath() + "',result='"
+                  + status.getResult() + "',msg='" + status.getMessage() + 
"']";
+            ingestsSuccess = false;
+         } else if (!status.getResult().equals(IngestStatus.Result.SUCCESS)) {
+            LOG.log(Level.WARNING, "Product was not ingested [file='"
+                  + status.getProduct().getAbsolutePath() + "',result='"
+                  + status.getResult() + "',msg='" + status.getMessage() + 
"']");
+         }
+      }
+      if (!ingestsSuccess)
+         throw new Exception(exceptionMsg);
+   }
+
+   protected void updateDynamicMetadata() {
+      this.pgeMetadata.commitMarkedDynamicMetadataKeys();
+   }
+
+   public void run(Metadata metadata, WorkflowTaskConfiguration config)
+         throws WorkflowTaskInstanceException {
+      try {
+         this.initialize(metadata, config);
+         this.prePgeSetup();
+         this.runPge();
+         this.processOutput();
+         this.updateDynamicMetadata();
+         this.ingestProducts();
+      } catch (Exception e) {
+         throw new WorkflowTaskInstanceException("PGETask failed : "
+               + e.getMessage(), e);
+      }
+   }
 }

Modified: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java?rev=1300432&r1=1300431&r2=1300432&view=diff
==============================================================================
--- 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java
 (original)
+++ 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/config/XmlFilePgeConfigBuilder.java
 Wed Mar 14 01:05:41 2012
@@ -215,7 +215,10 @@ public class XmlFilePgeConfigBuilder imp
                         .toLowerCase().equals("true"))
                        localPgeMetadata.markAsDynamicMetadataKey(key);
                 
-                curPlusLocalMetadata.replaceMetadata(key, 
curPgeMetadata.getAllMetadata(key));
+                List<String> values = curPgeMetadata.getAllMetadata(key);
+                if (values != null) {
+                   curPlusLocalMetadata.replaceMetadata(key, values);
+                }
             }
         }
         return localPgeMetadata;

Modified: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java?rev=1300432&r1=1300431&r2=1300432&view=diff
==============================================================================
--- 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java 
(original)
+++ 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java 
Wed Mar 14 01:05:41 2012
@@ -396,7 +396,10 @@ public class PgeMetadata {
             case LOCAL:
                combinedMetadata.replaceMetadata(localMetadata);
                for (String key : keyLinkMap.keySet()) {
-                  combinedMetadata.replaceMetadata(key, getAllMetadata(key));
+                  List<String> values = getAllMetadata(key);
+                  if (values != null) {
+                     combinedMetadata.replaceMetadata(key, values);
+                  }
                }
                break;
          }
@@ -446,7 +449,7 @@ public class PgeMetadata {
                break;
          }
       }
-      return Lists.newArrayList();
+      return null;
    }
 
    /**
@@ -455,6 +458,6 @@ public class PgeMetadata {
     */
    public String getMetadata(String key, Type... types) {
       List<String> values = getAllMetadata(key, types);
-      return (values.size() > 0) ? values.get(0) : null;
+      return values != null ? values.get(0) : null;
    }
 }

Modified: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetadataKeys.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetadataKeys.java?rev=1300432&r1=1300431&r2=1300432&view=diff
==============================================================================
--- 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetadataKeys.java
 (original)
+++ 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetadataKeys.java
 Wed Mar 14 01:05:41 2012
@@ -14,33 +14,39 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-
 package org.apache.oodt.cas.pge.metadata;
 
 /**
- * 
- * @author bfoster
- * @version $Revision$
+ * PGETaskInstance Reserved Metadata keys.
  *
- * <p>Describe your class here</p>.
+ * @author bfoster (Brian Foster)
  */
 public interface PgeTaskMetadataKeys {
 
     public static final String NAME = "PGETask_Name";
 
+    /** @deprecated Never used. */
+    @Deprecated
     public static final String SCI_EXE_PATH = "PGETask_SciExe_Path";
 
+    /** @deprecated Never used. */
+    @Deprecated
     public static final String SCI_EXE_VERSION = "PGETask_SciExe_Version";
 
+    /** @deprecated Never used. */
+    @Deprecated
     public static final String PRODUCT_PATH = "PGETask_ProductPath";
 
     public static final String CONFIG_FILE_PATH = "PGETask_ConfigFilePath";
     
     public static final String LOG_FILE_PATTERN = "PGETask_LogFilePattern";
 
+    /** @deprecated Use {@link #PROPERTY_ADDERS} instead. */
+    @Deprecated
     public static final String PROPERTY_ADDER_CLASSPATH = 
"PGETask_PropertyAdderClasspath";
 
+    public static final String PROPERTY_ADDERS = "PGETask_PropertyAdders";
+
     public static final String PGE_RUNTIME = "PGETask_Runtime";
 
     public static final String ATTEMPT_INGEST_ALL = "PGETask_AttemptIngestAll";

Added: 
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java?rev=1300432&view=auto
==============================================================================
--- 
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java
 (added)
+++ 
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java
 Wed Mar 14 01:05:41 2012
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge;
+
+//OODT imports
+import org.apache.oodt.cas.pge.metadata.PgeMetadata;
+
+/**
+ * Mock implementation of {@link ConfigFilePropertyAdder}.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class MockConfigFilePropertyAdder implements ConfigFilePropertyAdder {
+
+   public static final String RUN_COUNTER = 
"MockConfigFilePropertyAdder/RunCounter";
+
+   public void addConfigProperties(PgeMetadata metadata, Object... objs) {
+      String key = (String) objs[0];
+      String value = (String) objs[1];
+      metadata.replaceMetadata(key, value);
+      String runCounter = metadata.getMetadata(RUN_COUNTER);
+      if (runCounter == null) {
+         metadata.replaceMetadata(RUN_COUNTER, "1");
+      } else {
+         metadata.replaceMetadata(RUN_COUNTER, 
Integer.toString(Integer.parseInt(runCounter) + 1));
+      }
+   }
+}

Propchange: 
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/MockConfigFilePropertyAdder.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java?rev=1300432&view=auto
==============================================================================
--- oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java 
(added)
+++ oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java 
Wed Mar 14 01:05:41 2012
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge;
+
+//OODT static imports
+import static 
org.apache.oodt.cas.pge.metadata.PgeTaskMetadataKeys.PROPERTY_ADDERS;
+import static 
org.apache.oodt.cas.pge.metadata.PgeTaskMetadataKeys.PROPERTY_ADDER_CLASSPATH;
+
+//OODT imports
+import org.apache.oodt.cas.metadata.Metadata;
+import org.apache.oodt.cas.pge.PGETaskInstance;
+import org.apache.oodt.cas.pge.config.PgeConfig;
+import org.apache.oodt.cas.pge.metadata.PgeMetadata;
+
+//Google imports
+import com.google.common.collect.Lists;
+
+//JUnit imports
+import junit.framework.TestCase;
+
+/**
+ * Test class for {@link PGETaskInstance}.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class TestPGETaskInstance extends TestCase {
+
+   public void testLoadPropertyAdders() throws Exception {
+      PGETaskInstance pgeTask = new PGETaskInstance();
+      ConfigFilePropertyAdder propAdder = 
pgeTask.loadPropertyAdder(MockConfigFilePropertyAdder.class.getCanonicalName());
+      assertNotNull(propAdder);
+      assertTrue(propAdder instanceof MockConfigFilePropertyAdder);
+   }
+
+   public void testRunPropertyAdders() throws Exception {
+      PGETaskInstance pgeTask = new PGETaskInstance();
+      Metadata staticMet = new Metadata();
+      staticMet.addMetadata(PROPERTY_ADDER_CLASSPATH, 
MockConfigFilePropertyAdder.class.getCanonicalName());
+      Metadata dynMet = new Metadata();
+      pgeTask.pgeMetadata = new PgeMetadata(staticMet, dynMet);
+      pgeTask.pgeConfig = new PgeConfig();
+      pgeTask.pgeConfig.setPropertyAdderCustomArgs(new Object[] { "key", 
"value" });
+      pgeTask.runPropertyAdders();
+      assertEquals("value", pgeTask.pgeMetadata.getMetadata("key"));
+
+      staticMet = new Metadata();
+      dynMet = new Metadata();
+      dynMet.addMetadata(PROPERTY_ADDERS, 
MockConfigFilePropertyAdder.class.getCanonicalName());
+      pgeTask.pgeMetadata = new PgeMetadata(staticMet, dynMet);
+      pgeTask.pgeConfig = new PgeConfig();
+      pgeTask.pgeConfig.setPropertyAdderCustomArgs(new Object[] { "key", 
"value" });
+      pgeTask.runPropertyAdders();
+      assertEquals("value", pgeTask.pgeMetadata.getMetadata("key"));
+      assertEquals("1", 
pgeTask.pgeMetadata.getMetadata(MockConfigFilePropertyAdder.RUN_COUNTER));
+
+      staticMet = new Metadata();
+      dynMet = new Metadata();
+      dynMet.addMetadata(PROPERTY_ADDERS, 
Lists.newArrayList(MockConfigFilePropertyAdder.class.getCanonicalName(), 
MockConfigFilePropertyAdder.class.getCanonicalName()));
+      pgeTask.pgeMetadata = new PgeMetadata(staticMet, dynMet);
+      pgeTask.pgeConfig = new PgeConfig();
+      pgeTask.pgeConfig.setPropertyAdderCustomArgs(new Object[] { "key", 
"value" });
+      pgeTask.runPropertyAdders();
+      assertEquals("value", pgeTask.pgeMetadata.getMetadata("key"));
+      assertEquals("2", 
pgeTask.pgeMetadata.getMetadata(MockConfigFilePropertyAdder.RUN_COUNTER));
+   }
+}

Propchange: 
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain


Reply via email to