Author: bfoster
Date: Fri Mar 16 21:09:30 2012
New Revision: 1301762
URL: http://svn.apache.org/viewvc?rev=1301762&view=rev
Log:
- Created Java Logger Handler for CAS-PGE... This handler will allow for
multiple CAS-PGE instances to exist in the same JVM while routing the logging
from each to their own log files in their execution directory
- adding support for PgeMetadata to take PgeTaskMetKeys directly
- some cleanup and reorganizing of PgeTaskInstance
-------------
OODT-414
Added:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java
(with props)
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java
(with props)
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java
(with props)
Modified:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
Modified:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java?rev=1301762&r1=1301761&r2=1301762&view=diff
==============================================================================
--- oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
(original)
+++ oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/PGETaskInstance.java
Fri Mar 16 21:09:30 2012
@@ -24,7 +24,6 @@ import static org.apache.oodt.cas.pge.me
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.CRAWLER_RECUR;
import static
org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.INGEST_CLIENT_TRANSFER_SERVICE_FACTORY;
import static
org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.INGEST_FILE_MANAGER_URL;
-import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.LOG_FILE_PATTERN;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.MET_FILE_EXT;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.NAME;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.PGE_RUNTIME;
@@ -40,17 +39,18 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.URL;
+import java.nio.CharBuffer;
import java.util.LinkedList;
import java.util.List;
-import java.util.logging.FileHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;
-import java.util.logging.SimpleFormatter;
import java.util.regex.Pattern;
//OODT imports
+import org.apache.commons.lang.Validate;
import org.apache.oodt.cas.crawl.ProductCrawler;
import org.apache.oodt.cas.crawl.StdProductCrawler;
import org.apache.oodt.cas.crawl.status.IngestStatus;
@@ -63,6 +63,8 @@ import org.apache.oodt.cas.pge.config.Pg
import org.apache.oodt.cas.pge.config.RegExprOutputFiles;
import org.apache.oodt.cas.pge.config.RenamingConv;
import org.apache.oodt.cas.pge.config.XmlFilePgeConfigBuilder;
+import org.apache.oodt.cas.pge.logging.PgeLogHandler;
+import org.apache.oodt.cas.pge.logging.PgeLogRecord;
import org.apache.oodt.cas.pge.metadata.PgeMetadata;
import org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys;
import org.apache.oodt.cas.pge.writers.PcsMetFileWriter;
@@ -91,69 +93,74 @@ import com.google.common.collect.Lists;
*/
public class PGETaskInstance implements WorkflowTaskInstance {
- protected static Logger LOG = Logger.getLogger(PGETaskInstance.class
- .getName());
-
protected XmlRpcWorkflowManagerClient wm;
-
protected String workflowInstId;
-
protected PgeMetadata pgeMetadata;
-
protected PgeConfig pgeConfig;
- protected PGETaskInstance() {
+ protected PGETaskInstance() {}
+
+ protected void updateStatus(String status) throws Exception {
+ log(Level.INFO, "Updating status to workflow as [" + status + "]");
+ wm.updateWorkflowInstanceStatus(workflowInstId, status);
}
- protected void initialize(Metadata metadata, WorkflowTaskConfiguration
config)
- throws InstantiationException {
- try {
- // merge metadata
- this.pgeMetadata = createPgeMetadata(metadata, config);
+ protected Handler initializePgeLogger() throws Exception {
+ File logDir = new File(pgeConfig.getExeDir(), "logs");
+ if (!logDir.mkdirs()) {
+ throw new Exception("mkdirs for logs directory return false");
+ }
+ Handler handler = new PgeLogHandler(pgeMetadata.getMetadata(NAME),
+ new FileOutputStream(new File(logDir, createLogFileName())));
+ Logger.getLogger(PGETaskInstance.class.getName()).addHandler(handler);
+ return handler;
+ }
- // create PgeConfig
- this.pgeConfig = this.createPgeConfig();
+ protected String createLogFileName() {
+ return pgeMetadata.getMetadata(NAME) + "." + System.currentTimeMillis()
+ + ".log";
+ }
- // run Property Adders.
- runPropertyAdders();
+ protected void closePgeLogger(Handler handler) {
+ handler.close();
+ Logger.getLogger(PGETaskInstance.class.getName()).removeHandler(handler);
+ }
- // configure workflow manager
- wm = new XmlRpcWorkflowManagerClient(new URL(
- this.pgeMetadata
- .getMetadata(WORKFLOW_MANAGER_URL.getName())));
- workflowInstId = this.pgeMetadata
- .getMetadata(CoreMetKeys.WORKFLOW_INST_ID);
+ protected void log(Level level, String message) {
+ Logger.getLogger(PGETaskInstance.class.getName()).log(
+ new PgeLogRecord(pgeMetadata.getMetadata(NAME), level, message));
+ }
- } catch (Exception e) {
- e.printStackTrace();
- throw new InstantiationException(
- "Failed to instanciate PGETaskInstance : " + e.getMessage());
- }
+ protected void log(Level level, String message, Throwable t) {
+ Logger.getLogger(PGETaskInstance.class.getName()).log(
+ new PgeLogRecord(pgeMetadata.getMetadata(NAME), level, message,
t));
}
protected PgeMetadata createPgeMetadata(Metadata dynMetadata,
- WorkflowTaskConfiguration config) {
+ WorkflowTaskConfiguration config) throws Exception {
Metadata staticMetadata = new Metadata();
- for (Object key : config.getProperties().keySet()) {
- PgeTaskMetKeys metKey = PgeTaskMetKeys.getByName((String) key);
+ for (Object objKey : config.getProperties().keySet()) {
+ String key = (String) objKey;
+ PgeTaskMetKeys metKey = PgeTaskMetKeys.getByName(key);
if (metKey != null && metKey.isVector()) {
- staticMetadata.addMetadata(
- (String) key,
+ staticMetadata.addMetadata(key,
Lists.newArrayList(Splitter.on(",").trimResults()
.omitEmptyStrings()
- .split(config.getProperty((String) key))));
+ .split(config.getProperty(key))));
} else {
- staticMetadata.addMetadata((String) key,
- config.getProperty((String) key));
+ staticMetadata.addMetadata(key, config.getProperty(key));
}
}
return new PgeMetadata(staticMetadata, dynMetadata);
}
+ protected PgeConfig createPgeConfig() throws Exception {
+ return new XmlFilePgeConfigBuilder().build(pgeMetadata);
+ }
+
protected void runPropertyAdders() throws Exception {
try {
- List<String> propertyAdders = pgeMetadata
- .getAllMetadata(PROPERTY_ADDERS.getName());
+ List<String> propertyAdders =
pgeMetadata.getAllMetadata(PROPERTY_ADDERS);
if (propertyAdders != null) {
for (String propertyAdder : propertyAdders) {
runPropertyAdder(loadPropertyAdder(propertyAdder));
@@ -171,71 +178,79 @@ public class PGETaskInstance implements
.newInstance();
}
- protected void runPropertyAdder(ConfigFilePropertyAdder propAdder) {
- propAdder.addConfigProperties(this.pgeMetadata,
- this.pgeConfig.getPropertyAdderCustomArgs());
- }
-
- protected PgeConfig createPgeConfig() throws Exception {
- return new XmlFilePgeConfigBuilder().build(this.pgeMetadata);
+ protected void runPropertyAdder(ConfigFilePropertyAdder propAdder)
+ throws Exception {
+ propAdder.addConfigProperties(pgeMetadata,
+ pgeConfig.getPropertyAdderCustomArgs());
}
- protected void updateStatus(String status) {
- try {
- LOG.log(Level.INFO, "Updating status to workflow as " + status);
- this.wm.updateWorkflowInstanceStatus(this.workflowInstId, status);
- } catch (Exception e) {
- LOG.log(Level.WARNING, "Failed to update to workflow as " + status
- + " : " + e.getMessage());
- }
+ protected XmlRpcWorkflowManagerClient createWorkflowManagerClient()
+ throws Exception {
+ String urlString = pgeMetadata.getMetadata(WORKFLOW_MANAGER_URL);
+ Validate.notNull(urlString, "Must specify " + WORKFLOW_MANAGER_URL);
+ return new XmlRpcWorkflowManagerClient(new URL(urlString));
}
- protected void prePgeSetup() throws Exception {
- this.createExeDir();
- this.createOuputDirsIfRequested();
- this.createSciPgeConfigFiles();
+ protected String getWorkflowInstanceId() throws Exception {
+ String instanceId =
pgeMetadata.getMetadata(CoreMetKeys.WORKFLOW_INST_ID);
+ Validate.notNull(instanceId, "Must specify "
+ + CoreMetKeys.WORKFLOW_INST_ID);
+ return instanceId;
}
- protected void createExeDir() {
- LOG.log(Level.INFO, "Creating PGE execution working directory: ["
- + this.pgeConfig.getExeDir() + "]");
- new File(this.pgeConfig.getExeDir()).mkdirs();
+ protected void createExeDir() throws Exception {
+ log(Level.INFO, "Creating PGE execution working directory: ["
+ + pgeConfig.getExeDir() + "]");
+ if (!new File(pgeConfig.getExeDir()).mkdirs()) {
+ throw new Exception("mkdirs returned false for creating ["
+ + pgeConfig.getExeDir() + "]");
+ }
}
- protected void createOuputDirsIfRequested() {
- for (OutputDir outputDir : this.pgeConfig.getOuputDirs()) {
+ protected void createOuputDirsIfRequested() throws Exception {
+ for (OutputDir outputDir : pgeConfig.getOuputDirs()) {
if (outputDir.isCreateBeforeExe()) {
- LOG.log(Level.INFO, "Creating PGE file ouput directory: ["
+ log(Level.INFO, "Creating PGE file ouput directory: ["
+ outputDir.getPath() + "]");
- new File(outputDir.getPath()).mkdirs();
+ if (!new File(outputDir.getPath()).mkdirs()) {
+ throw new Exception("mkdir returned false for creating ["
+ + outputDir.getPath() + "]");
+ }
}
}
}
- protected void createSciPgeConfigFiles() throws IOException {
- this.updateStatus(CONF_FILE_BUILD.getWorkflowStatusName());
+ protected void createSciPgeConfigFiles() throws Exception {
+ log(Level.INFO, "Starting creation of science PGE files...");
for (DynamicConfigFile dynamicConfigFile : pgeConfig
.getDynamicConfigFiles()) {
- try {
- this.createSciPgeConfigFile(dynamicConfigFile);
- } catch (Exception e) {
- e.printStackTrace();
- throw new IOException("Failed to created pge input config file ' "
- + dynamicConfigFile.getFilePath() + "' : " + e.getMessage());
- }
+ createSciPgeConfigFile(dynamicConfigFile);
}
+ log(Level.INFO, "Successfully wrote all science PGE files!");
}
protected void createSciPgeConfigFile(DynamicConfigFile dynamicConfigFile)
throws Exception {
+ Validate.notNull(dynamicConfigFile, "dynamicConfigFile cannot be null");
+ log(Level.INFO, "Starting creation of science PGE file [" +
dynamicConfigFile.getFilePath() + "]...");
+
+ // Create parent directory if it doesn't exist.
File parentDir = new File(dynamicConfigFile.getFilePath())
.getParentFile();
- if (!parentDir.exists())
+ if (!parentDir.exists()) {
parentDir.mkdirs();
+ }
+
+ // Load writer and write file.
+ log(Level.INFO, "Loading writer class for science PGE file [" +
dynamicConfigFile.getFilePath() + "]...");
SciPgeConfigFileWriter writer = (SciPgeConfigFileWriter) Class.forName(
dynamicConfigFile.getWriterClass()).newInstance();
+ log(Level.INFO, "Loaded writer [" + writer.getClass().getCanonicalName()
+ + "] for science PGE file [" + dynamicConfigFile.getFilePath()
+ + "]...");
+ log(Level.INFO, "Writing science PGE file [" +
dynamicConfigFile.getFilePath() + "]...");
writer.createConfigFile(dynamicConfigFile.getFilePath(),
- this.pgeMetadata.asMetadata(), dynamicConfigFile.getArgs());
+ pgeMetadata.asMetadata(), dynamicConfigFile.getArgs());
}
protected void processOutput() throws FileNotFoundException, IOException {
@@ -259,7 +274,7 @@ public class PGETaskInstance implements
: createdFile, writer, regExprFiles
.getArgs()));
} catch (Exception e) {
- LOG.log(Level.SEVERE,
+ log(Level.SEVERE,
"Failed to create metadata file for '" + createdFile
+ "' : " + e.getMessage(), e);
}
@@ -270,8 +285,7 @@ public class PGETaskInstance implements
outputMetadata,
createdFile.getAbsolutePath()
+ "."
- + this.pgeMetadata
- .getMetadata(MET_FILE_EXT.getName()));
+ + pgeMetadata.getMetadata(MET_FILE_EXT));
}
}
}
@@ -283,7 +297,7 @@ public class PGETaskInstance implements
String newFileName = PathUtils.doDynamicReplacement(
renamingConv.getRenamingString(), curMetadata);
File newFile = new File(file.getParentFile(), newFileName);
- LOG.log(Level.INFO, "Renaming file '" + file.getAbsolutePath() + "' to '"
+ log(Level.INFO, "Renaming file '" + file.getAbsolutePath() + "' to '"
+ newFile.getAbsolutePath() + "'");
if (!file.renameTo(newFile))
throw new IOException("Renaming returned false");
@@ -314,50 +328,13 @@ public class PGETaskInstance implements
}
protected String getPgeScriptName() {
- return "sciPgeExeScript_"
- + this.pgeMetadata.getMetadata(NAME.getName());
- }
-
- protected Handler initializePgeLogHandler() throws SecurityException,
- IOException {
- FileHandler handler = null;
- String logFilePattern = this.pgeMetadata
- .getMetadata(LOG_FILE_PATTERN.getName());
- if (logFilePattern != null) {
- LOG.log(Level.INFO,
- "Creating Log Handler to capture pge output to file '"
- + logFilePattern + "'");
- new File(logFilePattern).getParentFile().mkdirs();
- handler = new FileHandler(logFilePattern);
- handler.setFormatter(new SimpleFormatter());
-
- }
- return handler;
- }
-
- protected Logger initializePgeLogger(Handler handler) {
- if (handler != null) {
- Logger pgeLogger = Logger.getLogger(this.pgeMetadata
- .getMetadata(NAME.getName())
- + System.currentTimeMillis());
- pgeLogger.addHandler(handler);
- return pgeLogger;
- } else {
- return LOG;
- }
- }
-
- protected void closePgeLogHandler(Logger logger, Handler handler) {
- if (logger != null && handler != null) {
- logger.removeHandler(handler);
- handler.close();
- }
+ return "sciPgeExeScript_" + this.pgeMetadata.getMetadata(NAME);
}
protected void runPge() throws Exception {
ScriptFile sf = null;
- Handler handler = null;
- Logger pgeLogger = null;
+ OutputStream stdOS = createStdOutLogger();
+ OutputStream errOS = createStdErrLogger();
try {
long startTime = System.currentTimeMillis();
@@ -366,32 +343,37 @@ public class PGETaskInstance implements
sf.writeScriptFile(this.getScriptPath());
// run script and evaluate whether success or failure
- handler = this.initializePgeLogHandler();
- pgeLogger = this.initializePgeLogger(handler);
this.updateStatus(RUNNING_PGE.getWorkflowStatusName());
if (!this.wasPgeSuccessful(ExecUtils.callProgram(
this.pgeConfig.getShellType() + " " + this.getScriptPath(),
- pgeLogger,
+ stdOS, errOS,
new File(this.pgeConfig.getExeDir()).getAbsoluteFile())))
throw new RuntimeException("Pge didn't finish successfully");
else
- LOG.log(Level.INFO,
+ log(Level.INFO,
"Successfully completed running: '" + sf.getCommands() +
"'");
long endTime = System.currentTimeMillis();
- this.pgeMetadata.replaceMetadata(PGE_RUNTIME.getName(),
- (endTime - startTime) + "");
+ pgeMetadata.replaceMetadata(PGE_RUNTIME, (endTime - startTime) + "");
} catch (Exception e) {
- e.printStackTrace();
throw new Exception("Exception when executing PGE commands '"
+ (sf != null ? sf.getCommands() : "NULL") + "' : "
- + e.getMessage());
+ + e.getMessage(), e);
} finally {
- this.closePgeLogHandler(pgeLogger, handler);
+ try { stdOS.close(); } catch (Exception e) {}
+ try { errOS.close(); } catch (Exception e) {}
}
}
+ protected LoggerOuputStream createStdOutLogger() {
+ return new LoggerOuputStream(Level.INFO);
+ }
+
+ protected LoggerOuputStream createStdErrLogger() {
+ return new LoggerOuputStream(Level.SEVERE);
+ }
+
protected boolean wasPgeSuccessful(int returnCode) {
return returnCode == 0;
}
@@ -411,36 +393,31 @@ public class PGETaskInstance implements
protected void setCrawlerConfigurations(StdProductCrawler crawler)
throws Exception {
- crawler.setMetFileExtension(this.pgeMetadata
- .getMetadata(MET_FILE_EXT.getName()));
- crawler.setClientTransferer(this.pgeMetadata
- .getMetadata(INGEST_CLIENT_TRANSFER_SERVICE_FACTORY.getName()));
- crawler.setFilemgrUrl(this.pgeMetadata
- .getMetadata(INGEST_FILE_MANAGER_URL.getName()));
- String actionRepoFile = this.pgeMetadata
- .getMetadata(ACTION_REPO_FILE.getName());
+ crawler.setMetFileExtension(pgeMetadata.getMetadata(MET_FILE_EXT));
+ crawler.setClientTransferer(pgeMetadata
+ .getMetadata(INGEST_CLIENT_TRANSFER_SERVICE_FACTORY));
+ crawler.setFilemgrUrl(pgeMetadata.getMetadata(INGEST_FILE_MANAGER_URL));
+ String actionRepoFile = pgeMetadata.getMetadata(ACTION_REPO_FILE);
if (actionRepoFile != null && !actionRepoFile.equals("")) {
crawler.setApplicationContext(new FileSystemXmlApplicationContext(
actionRepoFile));
- List<String> actionIds = pgeMetadata
- .getAllMetadata(ACTION_IDS.getName());
+ List<String> actionIds = pgeMetadata.getAllMetadata(ACTION_IDS);
if (actionIds != null) {
crawler.setActionIds(actionIds);
}
}
- crawler.setRequiredMetadata(this.pgeMetadata
- .getAllMetadata(REQUIRED_METADATA.getName()));
- String crawlForDirsString = this.pgeMetadata
- .getMetadata(CRAWLER_CRAWL_FOR_DIRS.getName());
+ crawler.setRequiredMetadata(
+ pgeMetadata.getAllMetadata(REQUIRED_METADATA));
+ String crawlForDirsString = pgeMetadata
+ .getMetadata(CRAWLER_CRAWL_FOR_DIRS);
boolean crawlForDirs = (crawlForDirsString != null) ? crawlForDirsString
.toLowerCase().equals("true") : false;
- String recurString = this.pgeMetadata
- .getMetadata(CRAWLER_RECUR.getName());
+ String recurString = pgeMetadata.getMetadata(CRAWLER_RECUR);
boolean recur = (recurString != null) ? recurString.toLowerCase().equals(
"true") : true;
crawler.setCrawlForDirs(crawlForDirs);
crawler.setNoRecur(!recur);
- LOG.log(Level.INFO,
+ log(Level.INFO,
"Passing Workflow Metadata to CAS-Crawler as global metadata . .
.");
crawler.setGlobalMetadata(this.pgeMetadata
.asMetadata(PgeMetadata.Type.DYNAMIC));
@@ -451,11 +428,11 @@ public class PGETaskInstance implements
File currentDir = null;
try {
this.updateStatus(CRAWLING.getWorkflowStatusName());
- boolean attemptIngestAll = Boolean.parseBoolean(this.pgeMetadata
- .getMetadata(ATTEMPT_INGEST_ALL.getName()));
+ boolean attemptIngestAll = Boolean.parseBoolean(pgeMetadata
+ .getMetadata(ATTEMPT_INGEST_ALL));
for (File crawlDir : crawlDirs) {
currentDir = crawlDir;
- LOG.log(Level.INFO, "Executing StdProductCrawler in productPath: ["
+ log(Level.INFO, "Executing StdProductCrawler in productPath: ["
+ crawlDir + "]");
crawler.crawl(crawlDir);
if (!attemptIngestAll)
@@ -464,9 +441,8 @@ public class PGETaskInstance implements
if (attemptIngestAll)
this.verifyIngests(crawler);
} catch (Exception e) {
- LOG.log(
- Level.WARNING,
- "Failed while attempting to ingest products while crawling
directory '"
+ log(Level.WARNING,
+ "Failed while attempting to ingest products while crawling
directory '"
+ currentDir
+ "' (all products may not have been ingested) : "
+ e.getMessage(), e);
@@ -484,7 +460,7 @@ public class PGETaskInstance implements
+ status.getResult() + "',msg='" + status.getMessage() +
"']";
ingestsSuccess = false;
} else if (!status.getResult().equals(IngestStatus.Result.SUCCESS)) {
- LOG.log(Level.WARNING, "Product was not ingested [file='"
+ log(Level.WARNING, "Product was not ingested [file='"
+ status.getProduct().getAbsolutePath() + "',result='"
+ status.getResult() + "',msg='" + status.getMessage() +
"']");
}
@@ -494,21 +470,90 @@ public class PGETaskInstance implements
}
protected void updateDynamicMetadata() {
- this.pgeMetadata.commitMarkedDynamicMetadataKeys();
+ pgeMetadata.commitMarkedDynamicMetadataKeys();
}
public void run(Metadata metadata, WorkflowTaskConfiguration config)
throws WorkflowTaskInstanceException {
+ Handler handler = null;
try {
- this.initialize(metadata, config);
- this.prePgeSetup();
- this.runPge();
- this.processOutput();
- this.updateDynamicMetadata();
- this.ingestProducts();
+ // Initialize CAS-PGE.
+ pgeMetadata = createPgeMetadata(metadata, config);
+ pgeConfig = createPgeConfig();
+ runPropertyAdders();
+ wm = createWorkflowManagerClient();
+ workflowInstId = getWorkflowInstanceId();
+
+ // Initialize Logger.
+ handler = initializePgeLogger();
+
+ // Setup the PGE.
+ createExeDir();
+ createOuputDirsIfRequested();
+ updateStatus(CONF_FILE_BUILD.getWorkflowStatusName());
+ createSciPgeConfigFiles();
+
+ // Run the PGE and proccess it data.
+ runPge();
+
+ // Update metadata.
+ processOutput();
+ updateDynamicMetadata();
+
+ // Inject products.
+ ingestProducts();
} catch (Exception e) {
throw new WorkflowTaskInstanceException("PGETask failed : "
+ e.getMessage(), e);
+ } finally {
+ closePgeLogger(handler);
+ }
+ }
+
+ /**
+ * OutputStream which wraps {@link PGETaskInstance}'s
+ * {@link PGETaskInstance#log(Level, String)} method.
+ *
+ * @author bfoster (Brian Foster)
+ */
+ protected class LoggerOuputStream extends OutputStream {
+
+ private CharBuffer buffer;
+ private Level level;
+
+ public LoggerOuputStream(Level level) {
+ this(level, 512);
+ }
+
+ public LoggerOuputStream(Level level, int bufferSize) {
+ this.level = level;
+ buffer = CharBuffer.wrap(new char[bufferSize]);
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (!buffer.hasRemaining()) {
+ flush();
+ }
+ buffer.put((char) b);
+ }
+
+ @Override
+ public void flush() {
+ System.out.println("HELLO");
+ if (buffer.position() > 0) {
+ char[] flushContext = new char[buffer.position()];
+ System.arraycopy(buffer.array(), 0, flushContext, 0,
+ buffer.position());
+ log(level, new String(flushContext));
+ buffer.clear();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ flush();
+ super.close();
}
}
}
Added:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java?rev=1301762&view=auto
==============================================================================
---
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java
(added)
+++
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java
Fri Mar 16 21:09:30 2012
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge.logging;
+
+//JDK imports
+import java.io.FileNotFoundException;
+import java.io.OutputStream;
+import java.util.logging.LogRecord;
+import java.util.logging.SimpleFormatter;
+import java.util.logging.StreamHandler;
+
+/**
+ * Log Handler which only writes logs for a given CAS-PGE name. This allows
for
+ * multiple CAS-PGE instances to exist in the same JVM but still create their
+ * own log files.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class PgeLogHandler extends StreamHandler {
+
+ private String pgeName;
+
+ public PgeLogHandler(String pgeName, OutputStream os)
+ throws SecurityException, FileNotFoundException {
+ super(os, new SimpleFormatter());
+ this.pgeName = pgeName;
+ }
+
+ @Override
+ public void publish(LogRecord record) {
+ if (record instanceof PgeLogRecord
+ && pgeName.equals(((PgeLogRecord) record).getPgeName())) {
+ super.publish(record);
+ }
+ }
+}
Propchange:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Added:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java?rev=1301762&view=auto
==============================================================================
---
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java
(added)
+++
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java
Fri Mar 16 21:09:30 2012
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge.logging;
+
+//JDK imports
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+
+/**
+ * CAS-PGE's {@link LogRecord}.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class PgeLogRecord extends LogRecord {
+
+ private static final long serialVersionUID = 2334166761035931387L;
+
+ private String pgeName;
+
+ public PgeLogRecord(String pgeName, Level level, String msg) {
+ super(level, msg);
+ this.pgeName = pgeName;
+ }
+
+ public PgeLogRecord(String pgeName, Level level, String msg, Throwable t) {
+ this(pgeName, level, msg);
+ setThrown(t);
+ }
+
+ public String getPgeName() {
+ return pgeName;
+ }
+}
Propchange:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/logging/PgeLogRecord.java
------------------------------------------------------------------------------
svn:mime-type = text/plain
Modified:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java?rev=1301762&r1=1301761&r2=1301762&view=diff
==============================================================================
---
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
(original)
+++
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeMetadata.java
Fri Mar 16 21:09:30 2012
@@ -296,6 +296,12 @@ public class PgeMetadata {
return keyPath;
}
+ public void replaceMetadata(PgeTaskMetKeys key, String value) {
+ Validate.notNull(key, "key cannot be null");
+
+ replaceMetadata(key.getName(), value);
+ }
+
/**
* Replace the given key's value with the given value. If the given key is a
* key link, then it will update the value of the key it is linked to if
that
@@ -338,6 +344,12 @@ public class PgeMetadata {
}
}
+ public void replaceMetadata(PgeTaskMetKeys key, List<String> values) {
+ Validate.notNull(key, "key cannot be null");
+
+ replaceMetadata(key.getName(), values);
+ }
+
/**
* Replace the given key's values with the given values. If the given key is
* a key link, then it will update the values of the key it is linked to if
@@ -407,6 +419,10 @@ public class PgeMetadata {
return combinedMetadata;
}
+ public List<String> getAllMetadata(PgeTaskMetKeys key, Type... types) {
+ return getAllMetadata(key.getName(), types);
+ }
+
/**
* Get metadata values for given key. If Types are specified then it
provides
* the precedence order in which to search for the key. If no Type args are
@@ -452,6 +468,10 @@ public class PgeMetadata {
return null;
}
+ public String getMetadata(PgeTaskMetKeys key, Type... types) {
+ return getMetadata(key.getName(), types);
+ }
+
/**
* Returns the first value returned by {@link #getAllMetadata(String,
Type...)}, if it returns
* null then this method will also return null.
Modified:
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java?rev=1301762&r1=1301761&r2=1301762&view=diff
==============================================================================
---
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java
(original)
+++
oodt/trunk/pge/src/main/java/org/apache/oodt/cas/pge/metadata/PgeTaskMetKeys.java
Fri Mar 16 21:09:30 2012
@@ -39,12 +39,6 @@ public enum PgeTaskMetKeys {
"PGETask/ConfigFilePath",
"PGETask_ConfigFilePath"),
/**
- * Java logger pattern used to create log files for CAS-PGE.
- */
- LOG_FILE_PATTERN(
- "PGETask/LogFilePattern",
- "PGETask_LogFilePattern"),
- /**
* List of {@link ConfigFilePropertyAdder}s classpaths to be run.
*/
PROPERTY_ADDERS(
@@ -166,4 +160,9 @@ public enum PgeTaskMetKeys {
}
return null;
}
+
+ @Override
+ public String toString() {
+ return getName();
+ }
}
Modified:
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java?rev=1301762&r1=1301761&r2=1301762&view=diff
==============================================================================
--- oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
(original)
+++ oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/TestPGETaskInstance.java
Fri Mar 16 21:09:30 2012
@@ -17,11 +17,23 @@
package org.apache.oodt.cas.pge;
//OODT static imports
+import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.NAME;
import static org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys.PROPERTY_ADDERS;
+//JDK imports
+import java.io.File;
+import java.util.List;
+import java.util.UUID;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+
+//Apache imports
+import org.apache.commons.io.FileUtils;
+
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.pge.PGETaskInstance;
+import org.apache.oodt.cas.pge.PGETaskInstance.LoggerOuputStream;
import org.apache.oodt.cas.pge.config.PgeConfig;
import org.apache.oodt.cas.pge.metadata.PgeMetadata;
import org.apache.oodt.cas.pge.metadata.PgeTaskMetKeys;
@@ -93,7 +105,7 @@ public class TestPGETaskInstance extends
.getMetadata(MockConfigFilePropertyAdder.RUN_COUNTER));
}
- public void testCreatePgeMetadata() {
+ public void testCreatePgeMetadata() throws Exception {
final String PGE_NAME = "PGE_Test";
final String PGE_REQUIRED_METADATA = "Filename, FileLocation ";
final String PROP_ADDERS =
"some.prop.adder.classpath,some.other.classpath";
@@ -143,4 +155,73 @@ public class TestPGETaskInstance extends
pgeMet.getAllMetadata(PgeTaskMetKeys.PROPERTY_ADDERS.getName())
.get(0));
}
+
+ public void testLogger() throws Exception {
+ File tmpFile = File.createTempFile("bogus", "bogus");
+ File tmpDir = tmpFile.getParentFile();
+ tmpFile.delete();
+ File tmpDir1 = new File(tmpDir, UUID.randomUUID().toString());
+ assertTrue(tmpDir1.mkdirs());
+ File tmpDir2 = new File(tmpDir, UUID.randomUUID().toString());
+ assertTrue(tmpDir2.mkdirs());
+ File tmpDir3 = new File(tmpDir, UUID.randomUUID().toString());
+ assertTrue(tmpDir3.mkdirs());
+
+ final String PGE_1_NAME = "PGE1";
+ PGETaskInstance pgeTask1 = new PGETaskInstance();
+ pgeTask1.pgeMetadata = new PgeMetadata();
+ pgeTask1.pgeMetadata.replaceMetadata(NAME, PGE_1_NAME);
+ pgeTask1.pgeConfig = new PgeConfig();
+ pgeTask1.pgeConfig.setExeDir(tmpDir1.getAbsolutePath());
+ Handler handler1 = pgeTask1.initializePgeLogger();
+ pgeTask1.log(Level.INFO, "pge1 message1");
+ pgeTask1.log(Level.INFO, "pge1 message2");
+ pgeTask1.log(Level.INFO, "pge1 message3");
+ pgeTask1.closePgeLogger(handler1);
+ List<String> messages = FileUtils.readLines(
+ new File(tmpDir1, "logs").listFiles()[0], "UTF-8");
+ assertEquals("INFO: pge1 message1", messages.get(1));
+ assertEquals("INFO: pge1 message2", messages.get(3));
+ assertEquals("INFO: pge1 message3", messages.get(5));
+
+ final String PGE_2_NAME = "PGE2";
+ PGETaskInstance pgeTask2 = new PGETaskInstance();
+ pgeTask2.pgeMetadata = new PgeMetadata();
+ pgeTask2.pgeMetadata.replaceMetadata(NAME, PGE_2_NAME);
+ pgeTask2.pgeConfig = new PgeConfig();
+ pgeTask2.pgeConfig.setExeDir(tmpDir2.getAbsolutePath());
+ Handler handler2 = pgeTask2.initializePgeLogger();
+ pgeTask2.log(Level.SEVERE, "pge2 message1");
+ pgeTask2.closePgeLogger(handler2);
+ messages = FileUtils.readLines(new File(tmpDir2, "logs").listFiles()[0],
+ "UTF-8");
+ assertEquals("SEVERE: pge2 message1", messages.get(1));
+
+ PGETaskInstance pgeTask3 = new PGETaskInstance() {
+ @Override
+ protected LoggerOuputStream createStdOutLogger() {
+ return new LoggerOuputStream(Level.INFO, 10);
+ }
+ };
+ pgeTask3.pgeMetadata = new PgeMetadata();
+ pgeTask3.pgeMetadata.replaceMetadata(NAME, "TestPGE");
+ pgeTask3.pgeConfig = new PgeConfig();
+ pgeTask3.pgeConfig.setExeDir(tmpDir3.getAbsolutePath());
+ Handler handler3 = pgeTask3.initializePgeLogger();
+ LoggerOuputStream los = pgeTask3.createStdOutLogger();
+ los.write("This is a test write to a log file".getBytes());
+ los.close();
+ pgeTask3.closePgeLogger(handler3);
+ messages = FileUtils.readLines(new File(tmpDir3, "logs").listFiles()[0],
+ "UTF-8");
+ assertEquals(8, messages.size());
+ assertEquals("INFO: This is a ", messages.get(1));
+ assertEquals("INFO: test write", messages.get(3));
+ assertEquals("INFO: to a log ", messages.get(5));
+ assertEquals("INFO: file", messages.get(7));
+
+ FileUtils.forceDelete(tmpDir1);
+ FileUtils.forceDelete(tmpDir2);
+ FileUtils.forceDelete(tmpDir3);
+ }
}
Added:
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java
URL:
http://svn.apache.org/viewvc/oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java?rev=1301762&view=auto
==============================================================================
---
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java
(added)
+++
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java
Fri Mar 16 21:09:30 2012
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.oodt.cas.pge.logging;
+
+//JDK imports
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+//Google imports
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+
+//JUnit imports
+import junit.framework.TestCase;
+
+/**
+ * Test class for {@link PgeLogHandler}.
+ *
+ * @author bfoster (Brian Foster)
+ */
+public class TestPgeLogHandler extends TestCase {
+
+ public void testSeparatesMultipleCasPgeLogWritesByPgeName() throws
SecurityException, FileNotFoundException {
+ final String PGE_1_NAME = "PGE1";
+ final String PGE_2_NAME = "PGE2";
+
+ final StringBuffer pge1LogMessages = new StringBuffer("");
+ PgeLogHandler handler1 = new PgeLogHandler(PGE_1_NAME, new
OutputStream() {
+ @Override
+ public void write(int character) throws IOException {
+ pge1LogMessages.append((char) character);
+ }
+ });
+ final StringBuffer pge2LogMessages = new StringBuffer("");
+ PgeLogHandler handler2 = new PgeLogHandler(PGE_2_NAME, new
OutputStream() {
+ @Override
+ public void write(int character) throws IOException {
+ pge2LogMessages.append((char) character);
+ }
+ });
+
+ Logger logger = Logger.getLogger(TestPgeLogHandler.class.getName());
+ logger.addHandler(handler1);
+ logger.addHandler(handler2);
+
+ logger.log(new PgeLogRecord(PGE_1_NAME, Level.INFO, "pge1 message1"));
+ logger.log(new PgeLogRecord(PGE_1_NAME, Level.INFO, "pge1 message2"));
+ logger.log(new PgeLogRecord(PGE_1_NAME, Level.INFO, "pge1 message3"));
+
+ logger.log(new PgeLogRecord(PGE_2_NAME, Level.INFO, "pge2 message1"));
+
+ handler1.close();
+ handler2.close();
+
+ assertFalse(pge1LogMessages.toString().isEmpty());
+ List<String> messages = Lists.newArrayList(Splitter.on("\n")
+ .omitEmptyStrings().split(pge1LogMessages.toString()));
+ assertEquals(6, messages.size());
+ assertEquals("INFO: pge1 message1", messages.get(1));
+ assertEquals("INFO: pge1 message2", messages.get(3));
+ assertEquals("INFO: pge1 message3", messages.get(5));
+
+ assertFalse(pge2LogMessages.toString().isEmpty());
+ messages = Lists.newArrayList(Splitter.on("\n")
+ .omitEmptyStrings().split(pge2LogMessages.toString()));
+ assertEquals(2, messages.size());
+ assertEquals("INFO: pge2 message1", messages.get(1));
+ }
+}
Propchange:
oodt/trunk/pge/src/test/org/apache/oodt/cas/pge/logging/TestPgeLogHandler.java
------------------------------------------------------------------------------
svn:mime-type = text/plain