Author: bfoster
Date: Fri Mar 16 21:09:30 2012
New Revision: 1301762

URL: http://svn.apache.org/viewvc?rev=1301762&view=rev
Log:
- Created Java Logger Handler for CAS-PGE... This handler will allow for 
multiple CAS-PGE instances to exist in the same JVM while routing the logging 
from each to their own log files in their execution directory
 - adding support for PgeMetadata to take PgeTaskMetKeys directly
 - some cleanup and reorganizing of PgeTaskInstance

-------------
OODT-414

Added:
    oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/
    
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java 
  (with props)
    
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java  
 (with props)
    oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/
    
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java  
 (with props)
Modified:
    oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
    
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
    
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java
    oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java

Modified: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java?rev=1301762&r1=1301761&r2=1301762&view=diff
==============================================================================
--- oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java 
(original)
+++ oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java 
Fri Mar 16 21:09:30 2012
@@ -24,7 +24,6 @@ import static org.apache.oodt.cas.pge.me
 import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.CRAWLER_RECUR;
 import static 
org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.INGEST_CLIENT_TRANSFER_SERVICE_FACTORY;
 import static 
org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.INGEST_FILE_MANAGER_URL;
-import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.LOG_FILE_PATTERN;
 import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.MET_FILE_EXT;
 import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.NAME;
 import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.PGE_RUNTIME;
@@ -40,17 +39,18 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.URL;
+import java.nio.CharBuffer;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.logging.FileHandler;
 import java.util.logging.Handler;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import java.util.logging.SimpleFormatter;
 import java.util.regex.Pattern;
 
 //OODT imports
+import org.apache.commons.lang.Validate;
 import org.apache.oodt.cas.crawl.ProductCrawler;
 import org.apache.oodt.cas.crawl.StdProductCrawler;
 import org.apache.oodt.cas.crawl.status.IngestStatus;
@@ -63,6 +63,8 @@ import org.apache.oodt.cas.pge.config.Pg
 import org.apache.oodt.cas.pge.config.RegExprOutputFiles;
 import org.apache.oodt.cas.pge.config.RenamingConv;
 import org.apache.oodt.cas.pge.config.XmlFilePgeConfigBuilder;
+import org.apache.oodt.cas.pge.logging.PgeLogHandler;
+import org.apache.oodt.cas.pge.logging.PgeLogRecord;
 import org.apache.oodt.cas.pge.metadata.PgeMetadata;
 import org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys;
 import org.apache.oodt.cas.pge.writers.PcsMetFileWriter;
@@ -91,69 +93,74 @@ import com.google.common.collect.Lists;
  */
 public class PGETaskInstance implements WorkflowTaskInstance {
 
-   protected static Logger LOG = Logger.getLogger(PGETaskInstance.class
-         .getName());
-
    protected XmlRpcWorkflowManagerClient wm;
-
    protected String workflowInstId;
-
    protected PgeMetadata pgeMetadata;
-
    protected PgeConfig pgeConfig;
 
-   protected PGETaskInstance() {
+   protected PGETaskInstance() {}
+
+   protected void updateStatus(String status) throws Exception {
+      log(Level.INFO, "Updating status to workflow as [" + status + "]");
+      wm.updateWorkflowInstanceStatus(workflowInstId, status);
    }
 
-   protected void initialize(Metadata metadata, WorkflowTaskConfiguration 
config)
-         throws InstantiationException {
-      try {
-         // merge metadata
-         this.pgeMetadata = createPgeMetadata(metadata, config);
+   protected Handler initializePgeLogger() throws Exception {
+      File logDir = new File(pgeConfig.getExeDir(), "logs");
+      if (!logDir.mkdirs()) {
+         throw new Exception("mkdirs for logs directory return false");
+      }
+      Handler handler = new PgeLogHandler(pgeMetadata.getMetadata(NAME),
+            new FileOutputStream(new File(logDir, createLogFileName())));
+      Logger.getLogger(PGETaskInstance.class.getName()).addHandler(handler);
+      return handler;
+   }
 
-         // create PgeConfig
-         this.pgeConfig = this.createPgeConfig();
+   protected String createLogFileName() {
+      return pgeMetadata.getMetadata(NAME) + "." + System.currentTimeMillis()
+            + ".log";
+   }
 
-         // run Property Adders.
-         runPropertyAdders();
+   protected void closePgeLogger(Handler handler) {
+      handler.close();
+      Logger.getLogger(PGETaskInstance.class.getName()).removeHandler(handler);
+   }
 
-         // configure workflow manager
-         wm = new XmlRpcWorkflowManagerClient(new URL(
-               this.pgeMetadata
-                     .getMetadata(WORKFLOW_MANAGER_URL.getName())));
-         workflowInstId = this.pgeMetadata
-               .getMetadata(CoreMetKeys.WORKFLOW_INST_ID);
+   protected void log(Level level, String message) {
+      Logger.getLogger(PGETaskInstance.class.getName()).log(
+            new PgeLogRecord(pgeMetadata.getMetadata(NAME), level, message));
+   }
 
-      } catch (Exception e) {
-         e.printStackTrace();
-         throw new InstantiationException(
-               "Failed to instanciate PGETaskInstance : " + e.getMessage());
-      }
+   protected void log(Level level, String message, Throwable t) {
+      Logger.getLogger(PGETaskInstance.class.getName()).log(
+            new PgeLogRecord(pgeMetadata.getMetadata(NAME), level, message, 
t));
    }
 
    protected PgeMetadata createPgeMetadata(Metadata dynMetadata,
-         WorkflowTaskConfiguration config) {
+         WorkflowTaskConfiguration config) throws Exception {
       Metadata staticMetadata = new Metadata();
-      for (Object key : config.getProperties().keySet()) {
-         PgeTaskMetKeys metKey = PgeTaskMetKeys.getByName((String) key);
+      for (Object objKey : config.getProperties().keySet()) {
+         String key = (String) objKey;
+         PgeTaskMetKeys metKey = PgeTaskMetKeys.getByName(key);
          if (metKey != null && metKey.isVector()) {
-            staticMetadata.addMetadata(
-                  (String) key,
+            staticMetadata.addMetadata(key,
                   Lists.newArrayList(Splitter.on(",").trimResults()
                         .omitEmptyStrings()
-                        .split(config.getProperty((String) key))));
+                        .split(config.getProperty(key))));
          } else {
-            staticMetadata.addMetadata((String) key,
-                  config.getProperty((String) key));
+            staticMetadata.addMetadata(key, config.getProperty(key));
          }
       }
       return new PgeMetadata(staticMetadata, dynMetadata);
    }
 
+   protected PgeConfig createPgeConfig() throws Exception {
+      return new XmlFilePgeConfigBuilder().build(pgeMetadata);
+   }
+
    protected void runPropertyAdders() throws Exception {
       try {
-         List<String> propertyAdders = pgeMetadata
-               .getAllMetadata(PROPERTY_ADDERS.getName());
+         List<String> propertyAdders = 
pgeMetadata.getAllMetadata(PROPERTY_ADDERS);
          if (propertyAdders != null) {
             for (String propertyAdder : propertyAdders) {
                runPropertyAdder(loadPropertyAdder(propertyAdder));
@@ -171,71 +178,79 @@ public class PGETaskInstance implements 
             .newInstance();
    }
 
-   protected void runPropertyAdder(ConfigFilePropertyAdder propAdder) {
-      propAdder.addConfigProperties(this.pgeMetadata,
-            this.pgeConfig.getPropertyAdderCustomArgs());
-   }
-
-   protected PgeConfig createPgeConfig() throws Exception {
-      return new XmlFilePgeConfigBuilder().build(this.pgeMetadata);
+   protected void runPropertyAdder(ConfigFilePropertyAdder propAdder)
+         throws Exception {
+      propAdder.addConfigProperties(pgeMetadata,
+            pgeConfig.getPropertyAdderCustomArgs());
    }
 
-   protected void updateStatus(String status) {
-      try {
-         LOG.log(Level.INFO, "Updating status to workflow as " + status);
-         this.wm.updateWorkflowInstanceStatus(this.workflowInstId, status);
-      } catch (Exception e) {
-         LOG.log(Level.WARNING, "Failed to update to workflow as " + status
-               + " : " + e.getMessage());
-      }
+   protected XmlRpcWorkflowManagerClient createWorkflowManagerClient()
+         throws Exception {
+      String urlString = pgeMetadata.getMetadata(WORKFLOW_MANAGER_URL);
+      Validate.notNull(urlString, "Must specify " + WORKFLOW_MANAGER_URL);
+      return new XmlRpcWorkflowManagerClient(new URL(urlString));
    }
 
-   protected void prePgeSetup() throws Exception {
-      this.createExeDir();
-      this.createOuputDirsIfRequested();
-      this.createSciPgeConfigFiles();
+   protected String getWorkflowInstanceId() throws Exception {
+      String instanceId = 
pgeMetadata.getMetadata(CoreMetKeys.WORKFLOW_INST_ID);
+      Validate.notNull(instanceId, "Must specify "
+            + CoreMetKeys.WORKFLOW_INST_ID);
+      return instanceId;
    }
 
-   protected void createExeDir() {
-      LOG.log(Level.INFO, "Creating PGE execution working directory: ["
-            + this.pgeConfig.getExeDir() + "]");
-      new File(this.pgeConfig.getExeDir()).mkdirs();
+   protected void createExeDir() throws Exception {
+      log(Level.INFO, "Creating PGE execution working directory: ["
+            + pgeConfig.getExeDir() + "]");
+      if (!new File(pgeConfig.getExeDir()).mkdirs()) {
+         throw new Exception("mkdirs returned false for creating ["
+               + pgeConfig.getExeDir() + "]");
+      }
    }
 
-   protected void createOuputDirsIfRequested() {
-      for (OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
+   protected void createOuputDirsIfRequested() throws Exception {
+      for (OutputDir outputDir : pgeConfig.getOuputDirs()) {
          if (outputDir.isCreateBeforeExe()) {
-            LOG.log(Level.INFO, "Creating PGE file ouput directory: ["
+            log(Level.INFO, "Creating PGE file ouput directory: ["
                   + outputDir.getPath() + "]");
-            new File(outputDir.getPath()).mkdirs();
+            if (!new File(outputDir.getPath()).mkdirs()) {
+               throw new Exception("mkdir returned false for creating ["
+                     + outputDir.getPath() + "]");
+            }
          }
       }
    }
 
-   protected void createSciPgeConfigFiles() throws IOException {
-      this.updateStatus(CONF_FILE_BUILD.getWorkflowStatusName());
+   protected void createSciPgeConfigFiles() throws Exception {
+      log(Level.INFO, "Starting creation of science PGE files...");
       for (DynamicConfigFile dynamicConfigFile : pgeConfig
             .getDynamicConfigFiles()) {
-         try {
-            this.createSciPgeConfigFile(dynamicConfigFile);
-         } catch (Exception e) {
-            e.printStackTrace();
-            throw new IOException("Failed to created pge input config file ' "
-                  + dynamicConfigFile.getFilePath() + "' : " + e.getMessage());
-         }
+         createSciPgeConfigFile(dynamicConfigFile);
       }
+      log(Level.INFO, "Successfully wrote all science PGE files!");
    }
 
    protected void createSciPgeConfigFile(DynamicConfigFile dynamicConfigFile)
          throws Exception {
+      Validate.notNull(dynamicConfigFile, "dynamicConfigFile cannot be null");
+      log(Level.INFO, "Starting creation of science PGE file [" + 
dynamicConfigFile.getFilePath() + "]...");
+
+      // Create parent directory if it doesn't exist.
       File parentDir = new File(dynamicConfigFile.getFilePath())
             .getParentFile();
-      if (!parentDir.exists())
+      if (!parentDir.exists()) {
          parentDir.mkdirs();
+      }
+
+      // Load writer and write file.
+      log(Level.INFO, "Loading writer class for science PGE file [" + 
dynamicConfigFile.getFilePath() + "]...");
       SciPgeConfigFileWriter writer = (SciPgeConfigFileWriter) Class.forName(
             dynamicConfigFile.getWriterClass()).newInstance();
+      log(Level.INFO, "Loaded writer [" + writer.getClass().getCanonicalName()
+            + "] for science PGE file [" + dynamicConfigFile.getFilePath()
+            + "]...");
+      log(Level.INFO, "Writing science PGE file [" + 
dynamicConfigFile.getFilePath() + "]...");
       writer.createConfigFile(dynamicConfigFile.getFilePath(),
-            this.pgeMetadata.asMetadata(), dynamicConfigFile.getArgs());
+            pgeMetadata.asMetadata(), dynamicConfigFile.getArgs());
    }
 
    protected void processOutput() throws FileNotFoundException, IOException {
@@ -259,7 +274,7 @@ public class PGETaskInstance implements 
                                        : createdFile, writer, regExprFiles
                                        .getArgs()));
                   } catch (Exception e) {
-                     LOG.log(Level.SEVERE,
+                     log(Level.SEVERE,
                            "Failed to create metadata file for '" + createdFile
                                  + "' : " + e.getMessage(), e);
                   }
@@ -270,8 +285,7 @@ public class PGETaskInstance implements 
                      outputMetadata,
                      createdFile.getAbsolutePath()
                            + "."
-                           + this.pgeMetadata
-                                 .getMetadata(MET_FILE_EXT.getName()));
+                           + pgeMetadata.getMetadata(MET_FILE_EXT));
          }
       }
    }
@@ -283,7 +297,7 @@ public class PGETaskInstance implements 
       String newFileName = PathUtils.doDynamicReplacement(
             renamingConv.getRenamingString(), curMetadata);
       File newFile = new File(file.getParentFile(), newFileName);
-      LOG.log(Level.INFO, "Renaming file '" + file.getAbsolutePath() + "' to '"
+      log(Level.INFO, "Renaming file '" + file.getAbsolutePath() + "' to '"
             + newFile.getAbsolutePath() + "'");
       if (!file.renameTo(newFile))
          throw new IOException("Renaming returned false");
@@ -314,50 +328,13 @@ public class PGETaskInstance implements 
    }
 
    protected String getPgeScriptName() {
-      return "sciPgeExeScript_"
-            + this.pgeMetadata.getMetadata(NAME.getName());
-   }
-
-   protected Handler initializePgeLogHandler() throws SecurityException,
-         IOException {
-      FileHandler handler = null;
-      String logFilePattern = this.pgeMetadata
-            .getMetadata(LOG_FILE_PATTERN.getName());
-      if (logFilePattern != null) {
-         LOG.log(Level.INFO,
-               "Creating Log Handler to capture pge output to file '"
-                     + logFilePattern + "'");
-         new File(logFilePattern).getParentFile().mkdirs();
-         handler = new FileHandler(logFilePattern);
-         handler.setFormatter(new SimpleFormatter());
-
-      }
-      return handler;
-   }
-
-   protected Logger initializePgeLogger(Handler handler) {
-      if (handler != null) {
-         Logger pgeLogger = Logger.getLogger(this.pgeMetadata
-               .getMetadata(NAME.getName())
-               + System.currentTimeMillis());
-         pgeLogger.addHandler(handler);
-         return pgeLogger;
-      } else {
-         return LOG;
-      }
-   }
-
-   protected void closePgeLogHandler(Logger logger, Handler handler) {
-      if (logger != null && handler != null) {
-         logger.removeHandler(handler);
-         handler.close();
-      }
+      return "sciPgeExeScript_" + this.pgeMetadata.getMetadata(NAME);
    }
 
    protected void runPge() throws Exception {
       ScriptFile sf = null;
-      Handler handler = null;
-      Logger pgeLogger = null;
+      OutputStream stdOS = createStdOutLogger();
+      OutputStream errOS = createStdErrLogger();
       try {
          long startTime = System.currentTimeMillis();
 
@@ -366,32 +343,37 @@ public class PGETaskInstance implements 
          sf.writeScriptFile(this.getScriptPath());
 
          // run script and evaluate whether success or failure
-         handler = this.initializePgeLogHandler();
-         pgeLogger = this.initializePgeLogger(handler);
          this.updateStatus(RUNNING_PGE.getWorkflowStatusName());
          if (!this.wasPgeSuccessful(ExecUtils.callProgram(
                this.pgeConfig.getShellType() + " " + this.getScriptPath(),
-               pgeLogger,
+               stdOS, errOS,
                new File(this.pgeConfig.getExeDir()).getAbsoluteFile())))
             throw new RuntimeException("Pge didn't finish successfully");
          else
-            LOG.log(Level.INFO,
+            log(Level.INFO,
                   "Successfully completed running: '" + sf.getCommands() + 
"'");
 
          long endTime = System.currentTimeMillis();
-         this.pgeMetadata.replaceMetadata(PGE_RUNTIME.getName(),
-               (endTime - startTime) + "");
+         pgeMetadata.replaceMetadata(PGE_RUNTIME, (endTime - startTime) + "");
 
       } catch (Exception e) {
-         e.printStackTrace();
          throw new Exception("Exception when executing PGE commands '"
                + (sf != null ? sf.getCommands() : "NULL") + "' : "
-               + e.getMessage());
+               + e.getMessage(), e);
       } finally {
-         this.closePgeLogHandler(pgeLogger, handler);
+         try { stdOS.close(); } catch (Exception e) {}
+         try { errOS.close(); } catch (Exception e) {}
       }
    }
 
+   protected LoggerOuputStream createStdOutLogger() {
+      return new LoggerOuputStream(Level.INFO);
+   }
+
+   protected LoggerOuputStream createStdErrLogger() {
+      return new LoggerOuputStream(Level.SEVERE);
+   }
+
    protected boolean wasPgeSuccessful(int returnCode) {
       return returnCode == 0;
    }
@@ -411,36 +393,31 @@ public class PGETaskInstance implements 
 
    protected void setCrawlerConfigurations(StdProductCrawler crawler)
          throws Exception {
-      crawler.setMetFileExtension(this.pgeMetadata
-            .getMetadata(MET_FILE_EXT.getName()));
-      crawler.setClientTransferer(this.pgeMetadata
-            .getMetadata(INGEST_CLIENT_TRANSFER_SERVICE_FACTORY.getName()));
-      crawler.setFilemgrUrl(this.pgeMetadata
-            .getMetadata(INGEST_FILE_MANAGER_URL.getName()));
-      String actionRepoFile = this.pgeMetadata
-            .getMetadata(ACTION_REPO_FILE.getName());
+      crawler.setMetFileExtension(pgeMetadata.getMetadata(MET_FILE_EXT));
+      crawler.setClientTransferer(pgeMetadata
+            .getMetadata(INGEST_CLIENT_TRANSFER_SERVICE_FACTORY));
+      crawler.setFilemgrUrl(pgeMetadata.getMetadata(INGEST_FILE_MANAGER_URL));
+      String actionRepoFile = pgeMetadata.getMetadata(ACTION_REPO_FILE);
       if (actionRepoFile != null && !actionRepoFile.equals("")) {
          crawler.setApplicationContext(new FileSystemXmlApplicationContext(
                actionRepoFile));
-         List<String> actionIds = pgeMetadata
-            .getAllMetadata(ACTION_IDS.getName());
+         List<String> actionIds = pgeMetadata.getAllMetadata(ACTION_IDS);
          if (actionIds != null) {
             crawler.setActionIds(actionIds);
          }
       }
-      crawler.setRequiredMetadata(this.pgeMetadata
-            .getAllMetadata(REQUIRED_METADATA.getName()));
-      String crawlForDirsString = this.pgeMetadata
-            .getMetadata(CRAWLER_CRAWL_FOR_DIRS.getName());
+      crawler.setRequiredMetadata(
+            pgeMetadata.getAllMetadata(REQUIRED_METADATA));
+      String crawlForDirsString = pgeMetadata
+            .getMetadata(CRAWLER_CRAWL_FOR_DIRS);
       boolean crawlForDirs = (crawlForDirsString != null) ? crawlForDirsString
             .toLowerCase().equals("true") : false;
-      String recurString = this.pgeMetadata
-            .getMetadata(CRAWLER_RECUR.getName());
+      String recurString = pgeMetadata.getMetadata(CRAWLER_RECUR);
       boolean recur = (recurString != null) ? recurString.toLowerCase().equals(
             "true") : true;
       crawler.setCrawlForDirs(crawlForDirs);
       crawler.setNoRecur(!recur);
-      LOG.log(Level.INFO,
+      log(Level.INFO,
             "Passing Workflow Metadata to CAS-Crawler as global metadata . . 
.");
       crawler.setGlobalMetadata(this.pgeMetadata
             .asMetadata(PgeMetadata.Type.DYNAMIC));
@@ -451,11 +428,11 @@ public class PGETaskInstance implements 
       File currentDir = null;
       try {
          this.updateStatus(CRAWLING.getWorkflowStatusName());
-         boolean attemptIngestAll = Boolean.parseBoolean(this.pgeMetadata
-               .getMetadata(ATTEMPT_INGEST_ALL.getName()));
+         boolean attemptIngestAll = Boolean.parseBoolean(pgeMetadata
+               .getMetadata(ATTEMPT_INGEST_ALL));
          for (File crawlDir : crawlDirs) {
             currentDir = crawlDir;
-            LOG.log(Level.INFO, "Executing StdProductCrawler in productPath: ["
+            log(Level.INFO, "Executing StdProductCrawler in productPath: ["
                   + crawlDir + "]");
             crawler.crawl(crawlDir);
             if (!attemptIngestAll)
@@ -464,9 +441,8 @@ public class PGETaskInstance implements 
          if (attemptIngestAll)
             this.verifyIngests(crawler);
       } catch (Exception e) {
-         LOG.log(
-               Level.WARNING,
-               "Failed while attempting to ingest products while crawling 
directory '"
+         log(Level.WARNING,
+             "Failed while attempting to ingest products while crawling 
directory '"
                      + currentDir
                      + "' (all products may not have been ingested) : "
                      + e.getMessage(), e);
@@ -484,7 +460,7 @@ public class PGETaskInstance implements 
                   + status.getResult() + "',msg='" + status.getMessage() + 
"']";
             ingestsSuccess = false;
          } else if (!status.getResult().equals(IngestStatus.Result.SUCCESS)) {
-            LOG.log(Level.WARNING, "Product was not ingested [file='"
+            log(Level.WARNING, "Product was not ingested [file='"
                   + status.getProduct().getAbsolutePath() + "',result='"
                   + status.getResult() + "',msg='" + status.getMessage() + 
"']");
          }
@@ -494,21 +470,90 @@ public class PGETaskInstance implements 
    }
 
    protected void updateDynamicMetadata() {
-      this.pgeMetadata.commitMarkedDynamicMetadataKeys();
+      pgeMetadata.commitMarkedDynamicMetadataKeys();
    }
 
    public void run(Metadata metadata, WorkflowTaskConfiguration config)
          throws WorkflowTaskInstanceException {
+      Handler handler = null;
       try {
-         this.initialize(metadata, config);
-         this.prePgeSetup();
-         this.runPge();
-         this.processOutput();
-         this.updateDynamicMetadata();
-         this.ingestProducts();
+         // Initialize CAS-PGE.
+         pgeMetadata = createPgeMetadata(metadata, config);
+         pgeConfig = createPgeConfig();
+         runPropertyAdders();
+         wm = createWorkflowManagerClient();
+         workflowInstId = getWorkflowInstanceId();
+
+         // Initialize Logger.
+         handler = initializePgeLogger();
+
+         // Setup the PGE.
+         createExeDir();
+         createOuputDirsIfRequested();
+         updateStatus(CONF_FILE_BUILD.getWorkflowStatusName());
+         createSciPgeConfigFiles();
+
+         // Run the PGE and proccess it data.
+         runPge();
+
+         // Update metadata.
+         processOutput();
+         updateDynamicMetadata();
+
+         // Inject products.
+         ingestProducts();
       } catch (Exception e) {
          throw new WorkflowTaskInstanceException("PGETask failed : "
                + e.getMessage(), e);
+      } finally {
+         closePgeLogger(handler);
+      }
+   }
+
+   /**
+    * OutputStream which wraps {@link PGETaskInstance}'s
+    * {@link PGETaskInstance#log(Level, String)} method.
+    *
+    * @author bfoster (Brian Foster)
+    */
+   protected class LoggerOuputStream extends OutputStream {
+
+      private CharBuffer buffer;
+      private Level level;
+
+      public LoggerOuputStream(Level level) {
+         this(level, 512);
+      }
+
+      public LoggerOuputStream(Level level, int bufferSize) {
+         this.level = level;
+         buffer = CharBuffer.wrap(new char[bufferSize]);
+      }
+
+      @Override
+      public void write(int b) throws IOException {
+         if (!buffer.hasRemaining()) {
+            flush();
+         }
+         buffer.put((char) b);
+      }
+
+      @Override
+      public void flush() {
+         System.out.println("HELLO");
+         if (buffer.position() > 0) {
+            char[] flushContext = new char[buffer.position()];
+            System.arraycopy(buffer.array(), 0, flushContext, 0,
+                  buffer.position());
+            log(level, new String(flushContext));
+            buffer.clear();
+         }
+      }
+
+      @Override
+      public void close() throws IOException {
+         flush();
+         super.close();
       }
    }
 }

Added: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java?rev=1301762&view=auto
==============================================================================
--- 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java 
(added)
+++ 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java 
Fri Mar 16 21:09:30 2012
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge.logging;
+
+//JDK imports
+import java.io.FileNotFoundException;
+import java.io.OutputStream;
+import java.util.logging.LogRecord;
+import java.util.logging.SimpleFormatter;
+import java.util.logging.StreamHandler;
+
+/**
+ * Log Handler which only writes logs for a given CAS-PGE name.  This allows 
for
+ * multiple CAS-PGE instances to exist in the same JVM but still create their
+ * own log files.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class PgeLogHandler extends StreamHandler {
+
+   private String pgeName;
+
+   public PgeLogHandler(String pgeName, OutputStream os)
+         throws SecurityException, FileNotFoundException {
+      super(os, new SimpleFormatter());
+      this.pgeName = pgeName;
+   }
+
+   @Override
+   public void publish(LogRecord record) {
+      if (record instanceof PgeLogRecord
+            && pgeName.equals(((PgeLogRecord) record).getPgeName())) {
+         super.publish(record);
+      }
+   }
+}

Propchange: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java?rev=1301762&view=auto
==============================================================================
--- 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java 
(added)
+++ 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java 
Fri Mar 16 21:09:30 2012
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge.logging;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+
+/**
+ * CAS-PGE's {@link LogRecord}.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class PgeLogRecord extends LogRecord {
+
+   private static final long serialVersionUID = 2334166761035931387L;
+
+   private String pgeName;
+
+   public PgeLogRecord(String pgeName, Level level, String msg) {
+      super(level, msg);
+      this.pgeName = pgeName;
+   }
+
+   public PgeLogRecord(String pgeName, Level level, String msg, Throwable t) {
+      this(pgeName, level, msg);
+      setThrown(t);
+   }
+
+   public String getPgeName() {
+      return pgeName;
+   }
+}

Propchange: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java?rev=1301762&r1=1301761&r2=1301762&view=diff
==============================================================================
--- 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java 
(original)
+++ 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java 
Fri Mar 16 21:09:30 2012
@@ -296,6 +296,12 @@ public class PgeMetadata {
       return keyPath;
    }
 
+   public void replaceMetadata(PgeTaskMetKeys key, String value) {
+      Validate.notNull(key, "key cannot be null");
+
+      replaceMetadata(key.getName(), value);
+   }
+
    /**
     * Replace the given key's value with the given value. If the given key is a
     * key link, then it will update the value of the key it is linked to if 
that
@@ -338,6 +344,12 @@ public class PgeMetadata {
       }
    }
 
+   public void replaceMetadata(PgeTaskMetKeys key, List<String> values) {
+      Validate.notNull(key, "key cannot be null");
+
+      replaceMetadata(key.getName(), values);
+   }
+
    /**
     * Replace the given key's values with the given values. If the given key is
     * a key link, then it will update the values of the key it is linked to if
@@ -407,6 +419,10 @@ public class PgeMetadata {
       return combinedMetadata;
    }
 
+   public List<String> getAllMetadata(PgeTaskMetKeys key, Type... types) {
+      return getAllMetadata(key.getName(), types);
+   }
+
    /**
     * Get metadata values for given key. If Types are specified then it 
provides
     * the precedence order in which to search for the key. If no Type args are
@@ -452,6 +468,10 @@ public class PgeMetadata {
       return null;
    }
 
+   public String getMetadata(PgeTaskMetKeys key, Type... types) {
+      return getMetadata(key.getName(), types);
+   }
+
    /**
     * Returns the first value returned by {@link #getAllMetadata(String, 
Type...)}, if it returns
     * null then this method will also return null.

Modified: 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java?rev=1301762&r1=1301761&r2=1301762&view=diff
==============================================================================
--- 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java
 (original)
+++ 
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java
 Fri Mar 16 21:09:30 2012
@@ -39,12 +39,6 @@ public enum PgeTaskMetKeys {
          "PGETask/ConfigFilePath",
          "PGETask_ConfigFilePath"),
    /**
-    * Java logger pattern used to create log files for CAS-PGE.
-    */
-   LOG_FILE_PATTERN(
-         "PGETask/LogFilePattern",
-         "PGETask_LogFilePattern"),
-   /**
     * List of {@link ConfigFilePropertyAdder}s classpaths to be run.
     */
    PROPERTY_ADDERS(
@@ -166,4 +160,9 @@ public enum PgeTaskMetKeys {
       }
       return null;
    }
+
+   @Override
+   public String toString() {
+      return getName();
+   }
 }

Modified: 
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java?rev=1301762&r1=1301761&r2=1301762&view=diff
==============================================================================
--- oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java 
(original)
+++ oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java 
Fri Mar 16 21:09:30 2012
@@ -17,11 +17,23 @@
 package org.apache.oodt.cas.pge;
 
 //OODT static imports
+import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.NAME;
 import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.PROPERTY_ADDERS;
 
+//JDK imports
+import java.io.File;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+
+//Apache imports
+import org.apache.commons.io.FileUtils;
+
 //OODT imports
 import org.apache.oodt.cas.metadata.Metadata;
 import org.apache.oodt.cas.pge.PGETaskInstance;
+import org.apache.oodt.cas.pge.PGETaskInstance.LoggerOuputStream;
 import org.apache.oodt.cas.pge.config.PgeConfig;
 import org.apache.oodt.cas.pge.metadata.PgeMetadata;
 import org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys;
@@ -93,7 +105,7 @@ public class TestPGETaskInstance extends
                   .getMetadata(MockConfigFilePropertyAdder.RUN_COUNTER));
    }
 
-   public void testCreatePgeMetadata() {
+   public void testCreatePgeMetadata() throws Exception {
       final String PGE_NAME = "PGE_Test";
       final String PGE_REQUIRED_METADATA = "Filename, FileLocation ";
       final String PROP_ADDERS = 
"some.prop.adder.classpath,some.other.classpath";
@@ -143,4 +155,73 @@ public class TestPGETaskInstance extends
             pgeMet.getAllMetadata(PgeTaskMetKeys.PROPERTY_ADDERS.getName())
                   .get(0));
    }
+
+   public void testLogger() throws Exception {
+      File tmpFile = File.createTempFile("bogus", "bogus");
+      File tmpDir = tmpFile.getParentFile();
+      tmpFile.delete();
+      File tmpDir1 = new File(tmpDir, UUID.randomUUID().toString());
+      assertTrue(tmpDir1.mkdirs());
+      File tmpDir2 = new File(tmpDir, UUID.randomUUID().toString());
+      assertTrue(tmpDir2.mkdirs());
+      File tmpDir3 = new File(tmpDir, UUID.randomUUID().toString());
+      assertTrue(tmpDir3.mkdirs());
+
+      final String PGE_1_NAME = "PGE1";
+      PGETaskInstance pgeTask1 = new PGETaskInstance();
+      pgeTask1.pgeMetadata = new PgeMetadata();
+      pgeTask1.pgeMetadata.replaceMetadata(NAME, PGE_1_NAME);
+      pgeTask1.pgeConfig = new PgeConfig();
+      pgeTask1.pgeConfig.setExeDir(tmpDir1.getAbsolutePath());
+      Handler handler1 = pgeTask1.initializePgeLogger();
+      pgeTask1.log(Level.INFO, "pge1 message1");
+      pgeTask1.log(Level.INFO, "pge1 message2");
+      pgeTask1.log(Level.INFO, "pge1 message3");
+      pgeTask1.closePgeLogger(handler1);
+      List<String> messages = FileUtils.readLines(
+            new File(tmpDir1, "logs").listFiles()[0], "UTF-8");
+      assertEquals("INFO: pge1 message1", messages.get(1));
+      assertEquals("INFO: pge1 message2", messages.get(3));
+      assertEquals("INFO: pge1 message3", messages.get(5));
+
+      final String PGE_2_NAME = "PGE2";
+      PGETaskInstance pgeTask2 = new PGETaskInstance();
+      pgeTask2.pgeMetadata = new PgeMetadata();
+      pgeTask2.pgeMetadata.replaceMetadata(NAME, PGE_2_NAME);
+      pgeTask2.pgeConfig = new PgeConfig();
+      pgeTask2.pgeConfig.setExeDir(tmpDir2.getAbsolutePath());
+      Handler handler2 = pgeTask2.initializePgeLogger();
+      pgeTask2.log(Level.SEVERE, "pge2 message1");
+      pgeTask2.closePgeLogger(handler2);
+      messages = FileUtils.readLines(new File(tmpDir2, "logs").listFiles()[0],
+            "UTF-8");
+      assertEquals("SEVERE: pge2 message1", messages.get(1));
+
+      PGETaskInstance pgeTask3 = new PGETaskInstance() {
+         @Override
+         protected LoggerOuputStream createStdOutLogger() {
+            return new LoggerOuputStream(Level.INFO, 10);
+         }
+      };
+      pgeTask3.pgeMetadata = new PgeMetadata();
+      pgeTask3.pgeMetadata.replaceMetadata(NAME, "TestPGE");
+      pgeTask3.pgeConfig = new PgeConfig();
+      pgeTask3.pgeConfig.setExeDir(tmpDir3.getAbsolutePath());
+      Handler handler3 = pgeTask3.initializePgeLogger();
+      LoggerOuputStream los = pgeTask3.createStdOutLogger();
+      los.write("This is a test write to a log file".getBytes());
+      los.close();
+      pgeTask3.closePgeLogger(handler3);
+      messages = FileUtils.readLines(new File(tmpDir3, "logs").listFiles()[0],
+            "UTF-8");
+      assertEquals(8, messages.size());
+      assertEquals("INFO: This is a ", messages.get(1));
+      assertEquals("INFO: test write", messages.get(3));
+      assertEquals("INFO:  to a log ", messages.get(5));
+      assertEquals("INFO: file", messages.get(7));
+
+      FileUtils.forceDelete(tmpDir1);
+      FileUtils.forceDelete(tmpDir2);
+      FileUtils.forceDelete(tmpDir3);
+   }
 }

Added: 
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java
URL: 
http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java?rev=1301762&view=auto
==============================================================================
--- 
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java 
(added)
+++ 
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java 
Fri Mar 16 21:09:30 2012
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge.logging;
+
+//JDK imports
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//Google imports
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+//JUnit imports
+import junit.framework.TestCase;
+
+/**
+ * Test class for {@link PgeLogHandler}.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class TestPgeLogHandler extends TestCase {
+
+   public void testSeparatesMultipleCasPgeLogWritesByPgeName() throws 
SecurityException, FileNotFoundException {
+      final String PGE_1_NAME = "PGE1";
+      final String PGE_2_NAME = "PGE2";
+      
+      final StringBuffer pge1LogMessages = new StringBuffer("");
+      PgeLogHandler handler1 = new PgeLogHandler(PGE_1_NAME, new 
OutputStream() {
+         @Override
+         public void write(int character) throws IOException {
+            pge1LogMessages.append((char) character);
+         }
+      });
+      final StringBuffer pge2LogMessages = new StringBuffer("");
+      PgeLogHandler handler2 = new PgeLogHandler(PGE_2_NAME, new 
OutputStream() {
+         @Override
+         public void write(int character) throws IOException {
+            pge2LogMessages.append((char) character);
+         }
+      });
+
+      Logger logger = Logger.getLogger(TestPgeLogHandler.class.getName());
+      logger.addHandler(handler1);
+      logger.addHandler(handler2);
+
+      logger.log(new PgeLogRecord(PGE_1_NAME, Level.INFO, "pge1 message1"));
+      logger.log(new PgeLogRecord(PGE_1_NAME, Level.INFO, "pge1 message2"));
+      logger.log(new PgeLogRecord(PGE_1_NAME, Level.INFO, "pge1 message3"));
+
+      logger.log(new PgeLogRecord(PGE_2_NAME, Level.INFO, "pge2 message1"));
+
+      handler1.close();
+      handler2.close();
+
+      assertFalse(pge1LogMessages.toString().isEmpty());
+      List<String> messages = Lists.newArrayList(Splitter.on("\n")
+            .omitEmptyStrings().split(pge1LogMessages.toString()));
+      assertEquals(6, messages.size());
+      assertEquals("INFO: pge1 message1", messages.get(1));
+      assertEquals("INFO: pge1 message2", messages.get(3));
+      assertEquals("INFO: pge1 message3", messages.get(5));
+
+      assertFalse(pge2LogMessages.toString().isEmpty());
+      messages = Lists.newArrayList(Splitter.on("\n")
+            .omitEmptyStrings().split(pge2LogMessages.toString()));
+      assertEquals(2, messages.size());
+      assertEquals("INFO: pge2 message1", messages.get(1));
+   }
+}

Propchange: 
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain


Reply via email to