http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java deleted file mode 100644 index 9b1e1f4..0000000 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionContext.java +++ /dev/null @@ -1,522 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.workflow; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.GnuParser; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.json.simple.JSONValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.text.DateFormat; -import java.text.SimpleDateFormat; -import java.util.HashMap; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; - - -/** - * Captures the workflow execution context. - */ -public class WorkflowExecutionContext { - - private static final Logger LOG = LoggerFactory.getLogger(WorkflowExecutionContext.class); - - public static final String INSTANCE_FORMAT = "yyyy-MM-dd-HH-mm"; // nominal time - - public static final String OUTPUT_FEED_SEPARATOR = ","; - public static final String INPUT_FEED_SEPARATOR = "#"; - public static final String CLUSTER_NAME_SEPARATOR = ","; - - /** - * Workflow execution status. - */ - public enum Status {WAITING, RUNNING, SUSPENDED, SUCCEEDED, FAILED, TIMEDOUT, KILLED} - - /** - * Workflow execution type. - */ - public enum Type {PRE_PROCESSING, POST_PROCESSING, WORKFLOW_JOB, COORDINATOR_ACTION} - - /** - * Entity operations supported. - */ - public enum EntityOperations { - GENERATE, DELETE, REPLICATE, IMPORT, EXPORT - } - - public static final WorkflowExecutionArgs[] USER_MESSAGE_ARGS = { - WorkflowExecutionArgs.CLUSTER_NAME, - WorkflowExecutionArgs.ENTITY_NAME, - WorkflowExecutionArgs.ENTITY_TYPE, - WorkflowExecutionArgs.NOMINAL_TIME, - WorkflowExecutionArgs.OPERATION, - - WorkflowExecutionArgs.OUTPUT_FEED_NAMES, - WorkflowExecutionArgs.OUTPUT_FEED_PATHS, - - WorkflowExecutionArgs.WORKFLOW_ID, - WorkflowExecutionArgs.WORKFLOW_USER, - WorkflowExecutionArgs.RUN_ID, - WorkflowExecutionArgs.STATUS, - WorkflowExecutionArgs.TIMESTAMP, - WorkflowExecutionArgs.LOG_DIR, - }; - - private final Map<WorkflowExecutionArgs, String> context; - private final long creationTime; - private Configuration actionJobConf; - - public WorkflowExecutionContext(Map<WorkflowExecutionArgs, String> context) { - this.context = context; - creationTime = System.currentTimeMillis(); - } - - public String getValue(WorkflowExecutionArgs arg) { - return context.get(arg); - } - - public void setValue(WorkflowExecutionArgs arg, String value) { - context.put(arg, value); - } - - public String getValue(WorkflowExecutionArgs arg, String defaultValue) { - return context.containsKey(arg) ? context.get(arg) : defaultValue; - } - - public boolean containsKey(WorkflowExecutionArgs arg) { - return context.containsKey(arg); - } - - public Set<Map.Entry<WorkflowExecutionArgs, String>> entrySet() { - return context.entrySet(); - } - - // helper methods - public boolean hasWorkflowSucceeded() { - return Status.SUCCEEDED.name().equals(getValue(WorkflowExecutionArgs.STATUS)); - } - - public boolean hasWorkflowFailed() { - return Status.FAILED.name().equals(getValue(WorkflowExecutionArgs.STATUS)); - } - - public boolean isWorkflowKilledManually(){ - try { - return WorkflowEngineFactory.getWorkflowEngine(). - isWorkflowKilledByUser( - getValue(WorkflowExecutionArgs.CLUSTER_NAME), - getValue(WorkflowExecutionArgs.WORKFLOW_ID)); - } catch (Exception e) { - LOG.error("Got Error in getting error codes from actions: " + e); - } - return false; - } - - public boolean hasWorkflowTimedOut() { - return Status.TIMEDOUT.name().equals(getValue(WorkflowExecutionArgs.STATUS)); - } - - public boolean hasWorkflowBeenKilled() { - return Status.KILLED.name().equals(getValue(WorkflowExecutionArgs.STATUS)); - } - - public String getContextFile() { - return getValue(WorkflowExecutionArgs.CONTEXT_FILE); - } - - public Status getWorkflowStatus() { - return Status.valueOf(getValue(WorkflowExecutionArgs.STATUS)); - } - - public String getLogDir() { - return getValue(WorkflowExecutionArgs.LOG_DIR); - } - - public String getLogFile() { - return getValue(WorkflowExecutionArgs.LOG_FILE); - } - - String getNominalTime() { - return getValue(WorkflowExecutionArgs.NOMINAL_TIME); - } - - /** - * Returns nominal time as a ISO8601 formatted string. - * @return a ISO8601 formatted string - */ - public String getNominalTimeAsISO8601() { - return SchemaHelper.formatDateUTCToISO8601(getNominalTime(), INSTANCE_FORMAT); - } - - String getTimestamp() { - return getValue(WorkflowExecutionArgs.TIMESTAMP); - } - - /** - * Returns timestamp as a long. - * @return Date as long (milliseconds since epoch) for the timestamp. - */ - public long getTimeStampAsLong() { - String dateString = getTimestamp(); - try { - DateFormat dateFormat = new SimpleDateFormat(INSTANCE_FORMAT.substring(0, dateString.length())); - dateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); - return dateFormat.parse(dateString).getTime(); - } catch (java.text.ParseException e) { - throw new RuntimeException(e); - } - } - - /** - * Returns timestamp as a ISO8601 formatted string. - * @return a ISO8601 formatted string - */ - public String getTimeStampAsISO8601() { - return SchemaHelper.formatDateUTCToISO8601(getTimestamp(), INSTANCE_FORMAT); - } - - public String getClusterName() { - String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME); - if (EntityOperations.REPLICATE != getOperation()) { - return value; - } - - return value.split(CLUSTER_NAME_SEPARATOR)[0]; - } - - public String getSrcClusterName() { - String value = getValue(WorkflowExecutionArgs.CLUSTER_NAME); - if (EntityOperations.REPLICATE != getOperation()) { - return value; - } - - String[] parts = value.split(CLUSTER_NAME_SEPARATOR); - if (parts.length != 2) { - throw new IllegalArgumentException("Replicated cluster pair is missing in " + value); - } - - return parts[1]; - } - - public String getEntityName() { - return getValue(WorkflowExecutionArgs.ENTITY_NAME); - } - - public String getEntityType() { - return getValue(WorkflowExecutionArgs.ENTITY_TYPE).toUpperCase(); - } - - public EntityOperations getOperation() { - if (getValue(WorkflowExecutionArgs.OPERATION) != null) { - return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.OPERATION)); - } - return EntityOperations.valueOf(getValue(WorkflowExecutionArgs.DATA_OPERATION)); - } - - public String getOutputFeedNames() { - return getValue(WorkflowExecutionArgs.OUTPUT_FEED_NAMES); - } - - public String[] getOutputFeedNamesList() { - return getOutputFeedNames().split(OUTPUT_FEED_SEPARATOR); - } - - public String getOutputFeedInstancePaths() { - return getValue(WorkflowExecutionArgs.OUTPUT_FEED_PATHS); - } - - public String[] getOutputFeedInstancePathsList() { - return getOutputFeedInstancePaths().split(OUTPUT_FEED_SEPARATOR); - } - - public String getInputFeedNames() { - return getValue(WorkflowExecutionArgs.INPUT_FEED_NAMES); - } - - public String[] getInputFeedNamesList() { - return getInputFeedNames().split(INPUT_FEED_SEPARATOR); - } - - public String getInputFeedInstancePaths() { - return getValue(WorkflowExecutionArgs.INPUT_FEED_PATHS); - } - - public String[] getInputFeedInstancePathsList() { - return getInputFeedInstancePaths().split(INPUT_FEED_SEPARATOR); - } - - public String getWorkflowEngineUrl() { - return getValue(WorkflowExecutionArgs.WF_ENGINE_URL); - } - - public String getUserWorkflowEngine() { - return getValue(WorkflowExecutionArgs.USER_WORKFLOW_ENGINE); - } - - public String getUserWorkflowVersion() { - return getValue(WorkflowExecutionArgs.USER_WORKFLOW_VERSION); - } - - public String getWorkflowId() { - return getValue(WorkflowExecutionArgs.WORKFLOW_ID); - } - - public String getWorkflowParentId() { - return getValue(WorkflowExecutionArgs.PARENT_ID); - } - - public String getUserSubflowId() { - return getValue(WorkflowExecutionArgs.USER_SUBFLOW_ID); - } - - public int getWorkflowRunId() { - return Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID)); - } - - public String getWorkflowRunIdString() { - return String.valueOf(Integer.parseInt(getValue(WorkflowExecutionArgs.RUN_ID))); - } - - public String getWorkflowUser() { - return getValue(WorkflowExecutionArgs.WORKFLOW_USER); - } - - public long getExecutionCompletionTime() { - - return creationTime; - } - - public String getDatasourceName() { return getValue(WorkflowExecutionArgs.DATASOURCE_NAME); } - - public long getWorkflowStartTime() { - return Long.parseLong(getValue(WorkflowExecutionArgs.WF_START_TIME)); - } - - public long getWorkflowEndTime() { - return Long.parseLong(getValue(WorkflowExecutionArgs.WF_END_TIME)); - } - - - public Type getContextType() { - return Type.valueOf(getValue(WorkflowExecutionArgs.CONTEXT_TYPE)); - } - - public String getCounters() { - return getValue(WorkflowExecutionArgs.COUNTERS); - } - - /** - * this method is invoked from with in the workflow. - * - * @throws java.io.IOException - * @throws org.apache.falcon.FalconException - */ - public void serialize() throws IOException, FalconException { - serialize(getContextFile()); - } - - /** - * this method is invoked from with in the workflow. - * - * @param contextFile file to serialize the workflow execution metadata - * @throws org.apache.falcon.FalconException - */ - public void serialize(String contextFile) throws FalconException { - LOG.info("Saving context to: [{}]", contextFile); - OutputStream out = null; - Path file = new Path(contextFile); - try { - FileSystem fs = - actionJobConf == null ? HadoopClientFactory.get().createProxiedFileSystem(file.toUri()) - : HadoopClientFactory.get().createProxiedFileSystem(file.toUri(), actionJobConf); - out = fs.create(file); - out.write(JSONValue.toJSONString(context).getBytes()); - } catch (IOException e) { - throw new FalconException("Error serializing context to: " + contextFile, e); - } finally { - if (out != null) { - try { - out.close(); - } catch (IOException ignore) { - // ignore - } - } - } - } - - @Override - public String toString() { - return "WorkflowExecutionContext{" + context.toString() + "}"; - } - - @SuppressWarnings("unchecked") - public static WorkflowExecutionContext deSerialize(String contextFile) throws FalconException { - try { - Path lineageDataPath = new Path(contextFile); // file has 777 permissions - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - lineageDataPath.toUri()); - - BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(lineageDataPath))); - return new WorkflowExecutionContext((Map<WorkflowExecutionArgs, String>) JSONValue.parse(in)); - } catch (IOException e) { - throw new FalconException("Error opening context file: " + contextFile, e); - } - } - - public static String getFilePath(String logDir, String entityName, String entityType, - EntityOperations operation) { - // needed by feed clean up - String parentSuffix = EntityType.PROCESS.name().equals(entityType) - || EntityOperations.REPLICATE == operation ? "" : "/context/"; - - // LOG_DIR is sufficiently unique - return new Path(logDir + parentSuffix, entityName + "-wf-post-exec-context.json").toString(); - } - - - public static Path getCounterFile(String logDir) { - return new Path(logDir, "counter.txt"); - } - - public static String readCounters(FileSystem fs, Path counterFile) throws IOException{ - StringBuilder counterBuffer = new StringBuilder(); - BufferedReader in = new BufferedReader(new InputStreamReader(fs.open(counterFile))); - try { - String line; - while ((line = in.readLine()) != null) { - counterBuffer.append(line); - counterBuffer.append(","); - } - } catch (IOException e) { - throw e; - } finally { - IOUtils.closeQuietly(in); - } - - String counterString = counterBuffer.toString(); - if (StringUtils.isNotBlank(counterString) && counterString.length() > 0) { - return counterString.substring(0, counterString.length() - 1); - } else { - return null; - } - } - - public static WorkflowExecutionContext create(String[] args, Type type) throws FalconException { - return create(args, type, null); - } - - public static WorkflowExecutionContext create(String[] args, Type type, Configuration conf) throws FalconException { - Map<WorkflowExecutionArgs, String> wfProperties = new HashMap<WorkflowExecutionArgs, String>(); - - try { - CommandLine cmd = getCommand(args); - for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) { - String optionValue = arg.getOptionValue(cmd); - if (StringUtils.isNotEmpty(optionValue)) { - wfProperties.put(arg, optionValue); - } - } - } catch (ParseException e) { - throw new FalconException("Error parsing wf args", e); - } - - WorkflowExecutionContext executionContext = new WorkflowExecutionContext(wfProperties); - executionContext.actionJobConf = conf; - executionContext.context.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name()); - executionContext.context.put(WorkflowExecutionArgs.CONTEXT_FILE, - getFilePath(executionContext.getLogDir(), executionContext.getEntityName(), - executionContext.getEntityType(), executionContext.getOperation())); - addCounterToWF(executionContext); - - return executionContext; - } - - private static void addCounterToWF(WorkflowExecutionContext executionContext) throws FalconException { - if (executionContext.hasWorkflowFailed()) { - LOG.info("Workflow Instance failed, counter will not be added: {}", - executionContext.getWorkflowRunIdString()); - return; - } - - FileSystem fs = HadoopClientFactory.get().createProxiedFileSystem( - new Path(executionContext.getLogDir()).toUri()); - Path counterFile = getCounterFile(executionContext.getLogDir()); - try { - if (fs.exists(counterFile)) { - String counters = readCounters(fs, counterFile); - if (StringUtils.isNotBlank(counters)) { - executionContext.context.put(WorkflowExecutionArgs.COUNTERS, counters); - } - } - } catch (IOException e) { - LOG.error("Error in accessing counter file :" + e); - } finally { - try { - if (fs.exists(counterFile)) { - fs.delete(counterFile, false); - } - } catch (IOException e) { - LOG.error("Unable to delete counter file: {}", e); - } - } - } - - private static CommandLine getCommand(String[] arguments) throws ParseException { - Options options = new Options(); - - for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) { - addOption(options, arg, arg.isRequired()); - } - - return new GnuParser().parse(options, arguments, false); - } - - private static void addOption(Options options, WorkflowExecutionArgs arg, boolean isRequired) { - Option option = arg.getOption(); - option.setRequired(isRequired); - options.addOption(option); - } - - public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties) { - return WorkflowExecutionContext.create(wfProperties, Type.POST_PROCESSING); - } - - public static WorkflowExecutionContext create(Map<WorkflowExecutionArgs, String> wfProperties, Type type) { - wfProperties.put(WorkflowExecutionArgs.CONTEXT_TYPE, type.name()); - return new WorkflowExecutionContext(wfProperties); - } -}
http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java deleted file mode 100644 index 7bf14f2..0000000 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowExecutionListener.java +++ /dev/null @@ -1,62 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.workflow; - -import org.apache.falcon.FalconException; - -/** - * A listener interface for workflow execution. - */ -public interface WorkflowExecutionListener { - - /** - * Invoked when a workflow is succeeds. - * @param context - * @throws FalconException - */ - void onSuccess(WorkflowExecutionContext context) throws FalconException; - - /** - * Invoked when a workflow fails. - * @param context - * @throws FalconException - */ - void onFailure(WorkflowExecutionContext context) throws FalconException; - - /** - * Invoked on start of a workflow. Basically, when the workflow is RUNNING. - * @param context - * @throws FalconException - */ - void onStart(WorkflowExecutionContext context) throws FalconException; - - /** - * Invoked when a workflow is suspended. - * @param context - * @throws FalconException - */ - void onSuspend(WorkflowExecutionContext context) throws FalconException; - - /** - * Invoked when a workflow is in waiting state. - * @param context - * @throws FalconException - */ - void onWait(WorkflowExecutionContext context) throws FalconException; -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java b/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java deleted file mode 100644 index b692258..0000000 --- a/common/src/main/java/org/apache/falcon/workflow/WorkflowJobEndNotificationService.java +++ /dev/null @@ -1,312 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.workflow; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.aspect.GenericAlert; -import org.apache.falcon.entity.EntityNotRegisteredException; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.SchemaHelper; -import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.service.FalconService; -import org.apache.falcon.util.ReflectionUtils; -import org.apache.falcon.util.StartupProperties; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Date; -import java.util.LinkedHashSet; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * A workflow job end notification service. - */ -public class WorkflowJobEndNotificationService implements FalconService { - - private static final Logger LOG = LoggerFactory.getLogger(WorkflowJobEndNotificationService.class); - - public static final String SERVICE_NAME = WorkflowJobEndNotificationService.class.getSimpleName(); - - private Set<WorkflowExecutionListener> listeners = new LinkedHashSet<WorkflowExecutionListener>(); - - // Maintain a cache of context built, so we don't have to query Oozie for every state change. - private Map<String, Properties> contextMap = new ConcurrentHashMap<>(); - - @Override - public String getName() { - return SERVICE_NAME; - } - - // Mainly for test - Map<String, Properties> getContextMap() { - return contextMap; - } - - @Override - public void init() throws FalconException { - String listenerClassNames = StartupProperties.get().getProperty( - "workflow.execution.listeners"); - if (StringUtils.isEmpty(listenerClassNames)) { - return; - } - - for (String listenerClassName : listenerClassNames.split(",")) { - listenerClassName = listenerClassName.trim(); - if (listenerClassName.isEmpty()) { - continue; - } - WorkflowExecutionListener listener = ReflectionUtils.getInstanceByClassName(listenerClassName); - registerListener(listener); - } - } - - @Override - public void destroy() throws FalconException { - listeners.clear(); - } - - public void registerListener(WorkflowExecutionListener listener) { - listeners.add(listener); - } - - public void unregisterListener(WorkflowExecutionListener listener) { - listeners.remove(listener); - } - - public void notifyFailure(WorkflowExecutionContext context) throws FalconException { - notifyWorkflowEnd(context); - } - - public void notifySuccess(WorkflowExecutionContext context) throws FalconException { - notifyWorkflowEnd(context); - } - - public void notifyStart(WorkflowExecutionContext context) throws FalconException { - // Start notifications can only be from Oozie JMS notifications - if (!updateContextFromWFConf(context)) { - return; - } - LOG.debug("Sending workflow start notification to listeners with context : {} ", context); - for (WorkflowExecutionListener listener : listeners) { - try { - listener.onStart(context); - } catch (Throwable t) { - // do not rethrow as other listeners do not get a chance - LOG.error("Error in listener {}", listener.getClass().getName(), t); - } - } - } - - public void notifySuspend(WorkflowExecutionContext context) throws FalconException { - // Suspend notifications can only be from Oozie JMS notifications - if (!updateContextFromWFConf(context)) { - return; - } - LOG.debug("Sending workflow suspend notification to listeners with context : {} ", context); - for (WorkflowExecutionListener listener : listeners) { - try { - listener.onSuspend(context); - } catch (Throwable t) { - // do not rethrow as other listeners do not get a chance - LOG.error("Error in listener {}", listener.getClass().getName(), t); - } - } - - instrumentAlert(context); - contextMap.remove(context.getWorkflowId()); - } - - public void notifyWait(WorkflowExecutionContext context) throws FalconException { - // Wait notifications can only be from Oozie JMS notifications - LOG.debug("Sending workflow wait notification to listeners with context : {} ", context); - for (WorkflowExecutionListener listener : listeners) { - try { - listener.onWait(context); - } catch (Throwable t) { - // do not rethrow as other listeners do not get a chance - LOG.error("Error in listener {}", listener.getClass().getName(), t); - } - } - } - - // The method retrieves the conf from the cache if it is in cache. - // Else, queries WF Engine to retrieve the conf of the workflow - private boolean updateContextFromWFConf(WorkflowExecutionContext context) throws FalconException { - Properties wfProps = contextMap.get(context.getWorkflowId()); - if (wfProps == null) { - Entity entity = null; - try { - entity = EntityUtil.getEntity(context.getEntityType(), context.getEntityName()); - } catch (EntityNotRegisteredException e) { - // Entity no longer exists. No need to notify. - LOG.debug("Entity {} of type {} doesn't exist in config store. Notification Ignored.", - context.getEntityName(), context.getEntityType()); - contextMap.remove(context.getWorkflowId()); - return false; - } - for (String cluster : EntityUtil.getClustersDefinedInColos(entity)) { - try { - InstancesResult.Instance[] instances = WorkflowEngineFactory.getWorkflowEngine(entity) - .getJobDetails(cluster, context.getWorkflowId()).getInstances(); - if (instances != null && instances.length > 0) { - wfProps = getWFProps(instances[0].getWfParams()); - // Required by RetryService. But, is not part of conf. - wfProps.setProperty(WorkflowExecutionArgs.RUN_ID.getName(), - Integer.toString(instances[0].getRunId())); - } - } catch (FalconException e) { - // Do Nothing. Move on to the next cluster. - continue; - } - contextMap.put(context.getWorkflowId(), wfProps); - } - } - - // No extra props to enhance the context with. - if (wfProps == null || wfProps.isEmpty()) { - return true; - } - - for (WorkflowExecutionArgs arg : WorkflowExecutionArgs.values()) { - if (wfProps.containsKey(arg.getName())) { - context.setValue(arg, wfProps.getProperty(arg.getName())); - } - } - return true; - } - - private Properties getWFProps(InstancesResult.KeyValuePair[] wfParams) { - Properties props = new Properties(); - for (InstancesResult.KeyValuePair kv : wfParams) { - props.put(kv.getKey(), kv.getValue()); - } - return props; - } - - // This method handles both success and failure notifications. - private void notifyWorkflowEnd(WorkflowExecutionContext context) throws FalconException { - // Need to distinguish notification from post processing for backward compatibility - if (context.getContextType() == WorkflowExecutionContext.Type.POST_PROCESSING) { - boolean engineNotifEnabled = false; - try { - engineNotifEnabled = WorkflowEngineFactory.getWorkflowEngine() - .isNotificationEnabled(context.getClusterName(), context.getWorkflowId()); - } catch (FalconException e) { - LOG.debug("Received error while checking if notification is enabled. " - + "Hence, assuming notification is not enabled."); - } - // Ignore the message from post processing as there will be one more from Oozie. - if (engineNotifEnabled) { - LOG.info("Ignoring message from post processing as engine notification is enabled."); - return; - } else { - updateContextWithTime(context); - } - } else { - if (!updateContextFromWFConf(context)) { - return; - } - } - - LOG.debug("Sending workflow end notification to listeners with context : {} ", context); - - for (WorkflowExecutionListener listener : listeners) { - try { - if (context.hasWorkflowSucceeded()) { - listener.onSuccess(context); - instrumentAlert(context); - } else { - listener.onFailure(context); - if (context.hasWorkflowBeenKilled() || context.hasWorkflowFailed()) { - instrumentAlert(context); - } - } - } catch (Throwable t) { - // do not rethrow as other listeners do not get a chance - LOG.error("Error in listener {}", listener.getClass().getName(), t); - } - } - - contextMap.remove(context.getWorkflowId()); - } - - // In case of notifications coming from post notifications, start and end time need to be populated. - private void updateContextWithTime(WorkflowExecutionContext context) { - try { - InstancesResult result = WorkflowEngineFactory.getWorkflowEngine() - .getJobDetails(context.getClusterName(), context.getWorkflowId()); - Date startTime = result.getInstances()[0].startTime; - Date endTime = result.getInstances()[0].endTime; - Date now = new Date(); - if (startTime == null) { - startTime = now; - } - if (endTime == null) { - endTime = now; - } - context.setValue(WorkflowExecutionArgs.WF_START_TIME, Long.toString(startTime.getTime())); - context.setValue(WorkflowExecutionArgs.WF_END_TIME, Long.toString(endTime.getTime())); - } catch(FalconException e) { - LOG.error("Unable to retrieve job details for " + context.getWorkflowId() + " on cluster " - + context.getClusterName(), e); - } - } - - private void instrumentAlert(WorkflowExecutionContext context) { - String clusterName = context.getClusterName(); - String entityName = context.getEntityName(); - String entityType = context.getEntityType(); - String operation = context.getOperation().name(); - String workflowId = context.getWorkflowId(); - String workflowUser = context.getWorkflowUser(); - String nominalTime = context.getNominalTimeAsISO8601(); - String runId = String.valueOf(context.getWorkflowRunId()); - Date now = new Date(); - // Start and/or End time may not be set in case of workflow suspend - Date endTime; - if (context.getWorkflowEndTime() == 0) { - endTime = now; - } else { - endTime = new Date(context.getWorkflowEndTime()); - } - - Date startTime; - if (context.getWorkflowStartTime() == 0) { - startTime = now; - } else { - startTime = new Date(context.getWorkflowStartTime()); - } - Long duration = (endTime.getTime() - startTime.getTime()) * 1000000; - - if (context.hasWorkflowFailed()) { - GenericAlert.instrumentFailedInstance(clusterName, entityType, - entityName, nominalTime, workflowId, workflowUser, runId, operation, - SchemaHelper.formatDateUTC(startTime), "", "", duration); - } else { - GenericAlert.instrumentSucceededInstance(clusterName, entityType, - entityName, nominalTime, workflowId, workflowUser, runId, operation, - SchemaHelper.formatDateUTC(startTime), duration); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java b/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java deleted file mode 100644 index 4d8402a..0000000 --- a/common/src/main/java/org/apache/falcon/workflow/engine/AbstractWorkflowEngine.java +++ /dev/null @@ -1,120 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.workflow.engine; - -import org.apache.falcon.FalconException; -import org.apache.falcon.LifeCycle; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.cluster.Cluster; -import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.resource.InstancesSummaryResult; - -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; - - -/** - * Workflow engine should minimally support the - * following operations. - */ -public abstract class AbstractWorkflowEngine { - - public static final String NAME_NODE = "nameNode"; - public static final String JOB_TRACKER = "jobTracker"; - - protected Set<WorkflowEngineActionListener> listeners = new HashSet<WorkflowEngineActionListener>(); - - public void registerListener(WorkflowEngineActionListener listener) { - listeners.add(listener); - } - - public abstract boolean isAlive(Cluster cluster) throws FalconException; - - public abstract void schedule(Entity entity, Boolean skipDryRun, Map<String, String> properties) - throws FalconException; - - public abstract String suspend(Entity entity) throws FalconException; - - public abstract String resume(Entity entity) throws FalconException; - - public abstract String delete(Entity entity) throws FalconException; - - public abstract String delete(Entity entity, String cluster) throws FalconException; - - public abstract String reRun(String cluster, String wfId, Properties props, boolean isForced) - throws FalconException; - - public abstract void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException; - - public abstract boolean isActive(Entity entity) throws FalconException; - - public abstract boolean isSuspended(Entity entity) throws FalconException; - - public abstract boolean isCompleted(Entity entity) throws FalconException; - - public abstract InstancesResult getRunningInstances(Entity entity, - List<LifeCycle> lifeCycles) throws FalconException; - - public abstract InstancesResult killInstances(Entity entity, Date start, Date end, Properties props, - List<LifeCycle> lifeCycles) throws FalconException; - - public abstract InstancesResult reRunInstances(Entity entity, Date start, Date end, Properties props, - List<LifeCycle> lifeCycles, Boolean isForced) throws FalconException; - - public abstract InstancesResult suspendInstances(Entity entity, Date start, Date end, Properties props, - List<LifeCycle> lifeCycles) throws FalconException; - - public abstract InstancesResult resumeInstances(Entity entity, Date start, Date end, Properties props, - List<LifeCycle> lifeCycles) throws FalconException; - - public abstract InstancesResult getStatus(Entity entity, Date start, Date end, - List<LifeCycle> lifeCycles, Boolean allAttempts) throws FalconException; - - public abstract InstancesSummaryResult getSummary(Entity entity, Date start, Date end, - List<LifeCycle> lifeCycles) throws FalconException; - - public abstract String update(Entity oldEntity, Entity newEntity, - String cluster, Boolean skipDryRun) throws FalconException; - - public abstract String touch(Entity entity, String cluster, Boolean skipDryRun) throws FalconException; - - public abstract String getWorkflowStatus(String cluster, String jobId) throws FalconException; - - public abstract Properties getWorkflowProperties(String cluster, String jobId) throws FalconException; - - public abstract InstancesResult getJobDetails(String cluster, String jobId) throws FalconException; - - public abstract InstancesResult getInstanceParams(Entity entity, Date start, Date end, - List<LifeCycle> lifeCycles) throws FalconException; - - public abstract boolean isNotificationEnabled(String cluster, String jobID) throws FalconException; - - public abstract Boolean isWorkflowKilledByUser(String cluster, String jobId) throws FalconException; - - - /** - * Returns the short name of the Workflow Engine. - * @return - */ - public abstract String getName(); -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java b/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java deleted file mode 100644 index 2a1cbd4..0000000 --- a/common/src/main/java/org/apache/falcon/workflow/engine/WorkflowEngineActionListener.java +++ /dev/null @@ -1,45 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.workflow.engine; - -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.Entity; - -/** - * Listener that will be notified before and after - * workflow life cycle operations are performed. - */ -public interface WorkflowEngineActionListener { - - void beforeSchedule(Entity entity, String cluster) throws FalconException; - - void afterSchedule(Entity entity, String cluster) throws FalconException; - - void beforeDelete(Entity entity, String cluster) throws FalconException; - - void afterDelete(Entity entity, String cluster) throws FalconException; - - void beforeSuspend(Entity entity, String cluster) throws FalconException; - - void afterSuspend(Entity entity, String cluster) throws FalconException; - - void beforeResume(Entity entity, String cluster) throws FalconException; - - void afterResume(Entity entity, String cluster) throws FalconException; -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java b/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java deleted file mode 100644 index 3f07c3c..0000000 --- a/common/src/main/java/org/apache/falcon/workflow/util/OozieActionConfigurationHelper.java +++ /dev/null @@ -1,80 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.workflow.util; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.util.Shell; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.io.StringWriter; - -/** - * Utility to read oozie action conf at oozie.action.conf.xml. - */ -public final class OozieActionConfigurationHelper { - - private static final Logger LOG = LoggerFactory.getLogger(OozieActionConfigurationHelper.class); - - private OozieActionConfigurationHelper() { - } - - public static Configuration createActionConf() throws IOException { - Configuration conf = new Configuration(); - Path confPath = new Path("file:///" + System.getProperty("oozie.action.conf.xml")); - - final boolean actionConfExists = confPath.getFileSystem(conf).exists(confPath); - LOG.info("Oozie Action conf {} found ? {}", confPath, actionConfExists); - if (actionConfExists) { - LOG.info("Oozie Action conf found, adding path={}, conf={}", confPath, conf.toString()); - conf.addResource(confPath); - dumpConf(conf, "oozie action conf "); - } - - String tokenFile = System.getenv("HADOOP_TOKEN_FILE_LOCATION"); - if (tokenFile != null) { - if (Shell.WINDOWS) { - if (tokenFile.charAt(0) == '"') { - tokenFile = tokenFile.substring(1); - } - if (tokenFile.charAt(tokenFile.length() - 1) == '"') { - tokenFile = tokenFile.substring(0, tokenFile.length() - 1); - } - } - - conf.set("mapreduce.job.credentials.binary", tokenFile); - System.setProperty("mapreduce.job.credentials.binary", tokenFile); - conf.set("tez.credentials.path", tokenFile); - System.setProperty("tez.credentials.path", tokenFile); - } - - conf.set("datanucleus.plugin.pluginRegistryBundleCheck", "LOG"); - conf.setBoolean("hive.exec.mode.local.auto", false); - - return conf; - } - - public static void dumpConf(Configuration conf, String message) throws IOException { - StringWriter writer = new StringWriter(); - Configuration.dumpConfiguration(conf, writer); - LOG.info(message + " {}", writer); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java b/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java deleted file mode 100644 index 05f248e..0000000 --- a/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java +++ /dev/null @@ -1,33 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.workflow.util; - -/** - * Oozie Constants used across multiple modules. - */ -public final class OozieConstants { - /** - * Constant for the oozie running in local. - */ - public static final String LOCAL_OOZIE = "localoozie"; - - private OozieConstants() { - } - -} http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/log4j.xml ---------------------------------------------------------------------- diff --git a/common/src/main/resources/log4j.xml b/common/src/main/resources/log4j.xml deleted file mode 100644 index 75c8267..0000000 --- a/common/src/main/resources/log4j.xml +++ /dev/null @@ -1,86 +0,0 @@ -<?xml version="1.0" encoding="UTF-8" ?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> - -<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> - -<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> - <appender name="console" class="org.apache.log4j.ConsoleAppender"> - <param name="Target" value="System.out"/> - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/> - </layout> - </appender> - - <appender name="FILE" class="org.apache.log4j.DailyRollingFileAppender"> - <param name="File" value="${user.dir}/target/logs/application.log"/> - <param name="Append" value="true"/> - <param name="Threshold" value="debug"/> - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %-5p - [%t:%x] ~ %m (%c{1}:%L)%n"/> - </layout> - </appender> - - <appender name="AUDIT" class="org.apache.log4j.DailyRollingFileAppender"> - <param name="File" value="${user.dir}/target/logs/audit.log"/> - <param name="Append" value="true"/> - <param name="Threshold" value="debug"/> - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %x %m%n"/> - </layout> - </appender> - - <appender name="METRIC" class="org.apache.log4j.DailyRollingFileAppender"> - <param name="File" value="${user.dir}/target/logs/metric.log"/> - <param name="Append" value="true"/> - <param name="Threshold" value="debug"/> - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %m%n"/> - </layout> - </appender> - - <appender name="ALERT" class="org.apache.log4j.DailyRollingFileAppender"> - <param name="File" value="${falcon.log.dir}/${falcon.app.type}.alerts.log"/> - <param name="Append" value="true"/> - <param name="Threshold" value="debug"/> - <layout class="org.apache.log4j.PatternLayout"> - <param name="ConversionPattern" value="%d %m%n"/> - </layout> - </appender> - - <logger name="org.apache.falcon" additivity="false"> - <level value="debug"/> - <appender-ref ref="FILE"/> - </logger> - - <logger name="AUDIT"> - <level value="info"/> - <appender-ref ref="AUDIT"/> - </logger> - - <logger name="METRIC"> - <level value="info"/> - <appender-ref ref="METRIC"/> - </logger> - - <root> - <priority value="info"/> - <appender-ref ref="console"/> - </root> - -</log4j:configuration> http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/runtime.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/runtime.properties b/common/src/main/resources/runtime.properties deleted file mode 100644 index 643559e..0000000 --- a/common/src/main/resources/runtime.properties +++ /dev/null @@ -1,54 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -*.domain=debug - -*.falcon.parentworkflow.retry.max=3 -*.falcon.parentworkflow.retry.interval.secs=1 - -*.falcon.replication.workflow.maxmaps=5 -*.falcon.replication.workflow.mapbandwidth=100 -*.webservices.default.results.per.page=10 - -# If true, do not run retention past feedCluster validity end time. -# This will retain recent instances beyond feedCluster validity end time. -*.falcon.retention.keep.instances.beyond.validity=true - -# Default configs to handle replication for late arriving feeds. -*.feed.late.allowed=true -*.feed.late.frequency=hours(3) -*.feed.late.policy=exp-backoff - -# If true, Falcon skips oozie dryrun while scheduling entities. -*.falcon.skip.dryrun=false - -######### Proxyuser Configuration Start ######### - -#List of hosts the '#USER#' user is allowed to perform 'doAs 'operations from. The '#USER#' must be replaced with the -#username of the user who is allowed to perform 'doAs' operations. The value can be the '*' wildcard or a list of -#comma separated hostnames - -*.falcon.service.ProxyUserService.proxyuser.#USER#.hosts=* - -#List of groups the '#USER#' user is allowed to 'doAs 'operations. The '#USER#' must be replaced with the -#username of the user who is allowed to perform 'doAs' operations. The value can be the '*' wildcard or a list of -#comma separated groups - -*.falcon.service.ProxyUserService.proxyuser.#USER#.groups=* - -######### Proxyuser Configuration End ######### \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties deleted file mode 100644 index 2497cce..0000000 --- a/common/src/main/resources/startup.properties +++ /dev/null @@ -1,306 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -*.domain=debug - -######### Implementation classes ######### -## DONT MODIFY UNLESS SURE ABOUT CHANGE ## - -*.workflow.engine.impl=org.apache.falcon.workflow.engine.OozieWorkflowEngine -*.lifecycle.engine.impl=org.apache.falcon.lifecycle.engine.oozie.OoziePolicyBuilderFactory -*.oozie.process.workflow.builder=org.apache.falcon.workflow.OozieProcessWorkflowBuilder -*.oozie.feed.workflow.builder=org.apache.falcon.workflow.OozieFeedWorkflowBuilder -*.SchedulableEntityManager.impl=org.apache.falcon.resource.SchedulableEntityManager -*.ConfigSyncService.impl=org.apache.falcon.resource.ConfigSyncService -*.ProcessInstanceManager.impl=org.apache.falcon.resource.InstanceManager -*.catalog.service.impl=org.apache.falcon.catalog.HiveCatalogService - -##### Falcon Services ##### -*.application.services=org.apache.falcon.security.AuthenticationInitializationService,\ - org.apache.falcon.workflow.WorkflowJobEndNotificationService, \ - org.apache.falcon.service.ProcessSubscriberService,\ - org.apache.falcon.service.FeedSLAMonitoringService,\ - org.apache.falcon.service.LifecyclePolicyMap,\ - org.apache.falcon.entity.store.ConfigurationStore,\ - org.apache.falcon.rerun.service.RetryService,\ - org.apache.falcon.rerun.service.LateRunService,\ - org.apache.falcon.metadata.MetadataMappingService,\ - org.apache.falcon.service.LogCleanupService,\ - org.apache.falcon.service.GroupsService,\ - org.apache.falcon.service.ProxyUserService,\ - org.apache.falcon.adfservice.ADFProviderService -## If you wish to use Falcon native scheduler add the commented out services below to application.services ## -# org.apache.falcon.notification.service.impl.JobCompletionService,\ -# org.apache.falcon.notification.service.impl.SchedulerService,\ -# org.apache.falcon.notification.service.impl.AlarmService,\ -# org.apache.falcon.notification.service.impl.DataAvailabilityService,\ -# org.apache.falcon.execution.FalconExecutionService,\ -# org.apache.falcon.state.store.service.FalconJPAService - - -# List of Lifecycle policies configured. -*.falcon.feed.lifecycle.policies=org.apache.falcon.lifecycle.retention.AgeBasedDelete -# List of builders for the policies. -*.falcon.feed.lifecycle.policy.builders=org.apache.falcon.lifecycle.engine.oozie.retention.AgeBasedDeleteBuilder -##### Falcon Configuration Store Change listeners ##### -*.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ - org.apache.falcon.entity.ColoClusterRelation,\ - org.apache.falcon.group.FeedGroupMap,\ - org.apache.falcon.entity.store.FeedLocationStore,\ - org.apache.falcon.service.FeedSLAMonitoringService,\ - org.apache.falcon.service.SharedLibraryHostingService -## If you wish to use Falcon native scheduler, add the State store as a configstore listener. ## -# org.apache.falcon.state.store.jdbc.JdbcStateStore - -##### JMS MQ Broker Implementation class ##### -*.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory - -##### List of shared libraries for Falcon workflows ##### -*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3 - -##### Workflow Job Execution Completion listeners ##### -*.workflow.execution.listeners= - -######### Implementation classes ######### - - -######### System startup parameters ######### - -# Location of libraries that is shipped to Hadoop -*.system.lib.location=${FALCON_HOME}/sharedlibs - -# Location to store user entity configurations - -#Configurations used in UTs -debug.config.store.uri=file://${user.dir}/target/store -#Location to store state of Feed SLA monitoring service -debug.feed.sla.service.store.uri= file://${user.dir}/target/data/sla/pendingfeedinstances -debug.config.oozie.conf.uri=${user.dir}/target/oozie -debug.system.lib.location=${system.lib.location} -debug.broker.url=vm://localhost -debug.retry.recorder.path=${user.dir}/target/retry -debug.libext.feed.retention.paths=${falcon.libext} -debug.libext.feed.replication.paths=${falcon.libext} -debug.libext.process.paths=${falcon.libext} - -#Configurations used in ITs -it.config.store.uri=file://${user.dir}/target/store -it.config.oozie.conf.uri=${user.dir}/target/oozie -it.system.lib.location=${system.lib.location} -it.broker.url=tcp://localhost:61616 -it.retry.recorder.path=${user.dir}/target/retry -it.libext.feed.retention.paths=${falcon.libext} -it.libext.feed.replication.paths=${falcon.libext} -it.libext.process.paths=${falcon.libext} -it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandler - -*.falcon.cleanup.service.frequency=minutes(5) - -######### Properties for Feed SLA Monitoring ######### -# frequency of serialization for the state of FeedSLAMonitoringService - 1 hour -*.feed.sla.serialization.frequency.millis=3600000 - -# Maximum number of pending instances per feed that will be recorded. After this older instances will be removed in -# a FIFO fashion. -*.feed.sla.queue.size=288 - -# Do not change unless really sure -# Frequency in seconds of "status check" for pending feed instances, default is 10 mins = 10 * 60 -*.feed.sla.statusCheck.frequency.seconds=600 - -# Do not change unless really sure -# Time Duration (in milliseconds) in future for generating pending feed instances. -# In every cycle pending feed instances are added for monitoring, till this time in future. -# It must be more than statusCheck frequency, default is 15 mins = 15 * 60 * 1000 -*.feed.sla.lookAheadWindow.millis=900000 - - -######### Properties for configuring JMS provider - activemq ######### -# Default Active MQ url -*.broker.url=tcp://localhost:61616 - -# default time-to-live for a JMS message 3 days (time in minutes) -*.broker.ttlInMins=4320 -*.entity.topic=FALCON.ENTITY.TOPIC -*.max.retry.failure.count=1 -*.retry.recorder.path=${user.dir}/logs/retry - -######### Properties for configuring iMon client and metric ######### -*.internal.queue.size=1000 - - -######### Graph Database Properties ######### -# Graph implementation -*.falcon.graph.blueprints.graph=com.thinkaurelius.titan.core.TitanFactory - -# Graph Storage -*.falcon.graph.storage.directory=${user.dir}/target/graphdb -*.falcon.graph.storage.backend=berkeleyje -*.falcon.graph.serialize.path=${user.dir}/target/graphdb -*.falcon.graph.preserve.history=false -*.falcon.graph.transaction.retry.count=3 -*.falcon.graph.transaction.retry.delay=5 - -# Uncomment and override the following properties for enabling metrics for titan db and pushing them to graphite. You -# can use other reporters like ganglia also. -# Refer (http://thinkaurelius.github.io/titan/wikidoc/0.4.2/Titan-Performance-and-Monitoring)for finding the -# relevant configurations for your use case. NOTE: you have to prefix all the properties with "*.falcon.graph." -# *.falcon.graph.storage.enable-basic-metrics = true -# Required; IP or hostname string -# *.falcon.graph.metrics.graphite.hostname = 192.168.0.1 -# Required; specify logging interval in milliseconds -# *.falcon.graph.metrics.graphite.interval = 60000 - -######### Authentication Properties ######### - -# Authentication type must be specified: simple|kerberos -*.falcon.authentication.type=simple - -##### Service Configuration - -# Indicates the Kerberos principal to be used in Falcon Service. -*.falcon.service.authentication.kerberos.principal= - -# Location of the keytab file with the credentials for the Service principal. -*.falcon.service.authentication.kerberos.keytab= - -# name node principal to talk to config store -*.dfs.namenode.kerberos.principal= - -##### SPNEGO Configuration - -# Authentication type must be specified: simple|kerberos|<class> -# org.apache.falcon.security.RemoteUserInHeaderBasedAuthenticationHandler can be used for backwards compatibility -*.falcon.http.authentication.type=simple - -# Indicates how long (in seconds) an authentication token is valid before it has to be renewed. -*.falcon.http.authentication.token.validity=36000 - -# The signature secret for signing the authentication tokens. -*.falcon.http.authentication.signature.secret=falcon - -# The domain to use for the HTTP cookie that stores the authentication token. -*.falcon.http.authentication.cookie.domain= - -# Indicates if anonymous requests are allowed when using 'simple' authentication. -*.falcon.http.authentication.simple.anonymous.allowed=false - -# Indicates the Kerberos principal to be used for HTTP endpoint. -# The principal MUST start with 'HTTP/' as per Kerberos HTTP SPNEGO specification. -*.falcon.http.authentication.kerberos.principal= - -# Location of the keytab file with the credentials for the HTTP principal. -*.falcon.http.authentication.kerberos.keytab= - -# The kerberos names rules is to resolve kerberos principal names, refer to Hadoop's KerberosName for more details. -*.falcon.http.authentication.kerberos.name.rules=DEFAULT - -# Comma separated list of black listed users -*.falcon.http.authentication.blacklisted.users= - -######### Authentication Properties ######### - - -######### Authorization Properties ######### - -# Authorization Enabled flag: false (default)|true -*.falcon.security.authorization.enabled=false - -# The name of the group of super-users -*.falcon.security.authorization.superusergroup=falcon - -# Admin Users, comma separated users -*.falcon.security.authorization.admin.users=falcon,ambari-qa - -# Admin Group Membership, comma separated users -*.falcon.security.authorization.admin.groups=falcon,staff - -# Authorization Provider Implementation Fully Qualified Class Name -*.falcon.security.authorization.provider=org.apache.falcon.security.DefaultAuthorizationProvider - -######### Authorization Properties ######### - -######### ADF Configurations start ######### - -# A String object that represents the namespace -*.microsoft.windowsazure.services.servicebus.namespace= - -# Request and status queues on the namespace -*.microsoft.windowsazure.services.servicebus.requestqueuename= -*.microsoft.windowsazure.services.servicebus.statusqueuename= - -# A String object that contains the SAS key name -*.microsoft.windowsazure.services.servicebus.sasKeyName= - -# A String object that contains the SAS key -*.microsoft.windowsazure.services.servicebus.sasKey= - -# A String object containing the base URI that is added to your Service Bus namespace to form the URI to connect -# to the Service Bus service. To access the default public Azure service, pass ".servicebus.windows.net" -*.microsoft.windowsazure.services.servicebus.serviceBusRootUri= - -# Service bus polling frequency -*.microsoft.windowsazure.services.servicebus.polling.frequency= - -# Super user -*.microsoft.windowsazure.services.servicebus.superuser= - -######### ADF Configurations end ########### - -######### SMTP Properties ######## - -# Setting SMTP hostname -#*.falcon.email.smtp.host=localhost - -# Setting SMTP port number -#*.falcon.email.smtp.port=25 - -# Setting email from address -#*.falcon.email.from.address=falcon@localhost - -# Setting email Auth -#*.falcon.email.smtp.auth=false - -#Setting user name -#*.falcon.email.smtp.user="" - -#Setting password -#*.falcon.email.smtp.password="" - -# Setting monitoring plugin, if SMTP parameters is defined -#*.monitoring.plugins=org.apache.falcon.plugin.DefaultMonitoringPlugin,\ -# org.apache.falcon.plugin.EmailNotificationPlugin - -######### StateStore Properties ##### -#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore -#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver -#*.falcon.statestore.jdbc.url=jdbc:derby:data/statestore.db;create=true -#*.falcon.statestore.jdbc.username=sa -#*.falcon.statestore.jdbc.password= -#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource -## Maximum number of active connections that can be allocated from this pool at the same time. -#*.falcon.statestore.pool.max.active.conn=10 -#*.falcon.statestore.connection.properties= -## Indicates the interval (in milliseconds) between eviction runs. -#*.falcon.statestore.validate.db.connection.eviction.interval=300000 -## The number of objects to examine during each run of the idle object evictor thread. -#*.falcon.statestore.validate.db.connection.eviction.num=10 -## Creates Falcon DB. -## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. -## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. -#*.falcon.statestore.create.db.schema=true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/statestore.credentials ---------------------------------------------------------------------- diff --git a/common/src/main/resources/statestore.credentials b/common/src/main/resources/statestore.credentials deleted file mode 100644 index 86c32a1..0000000 --- a/common/src/main/resources/statestore.credentials +++ /dev/null @@ -1,22 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - - -######### StateStore Credentials ##### -#*.falcon.statestore.jdbc.username=sa -#*.falcon.statestore.jdbc.password= \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/main/resources/statestore.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/statestore.properties b/common/src/main/resources/statestore.properties deleted file mode 100644 index 44e79b3..0000000 --- a/common/src/main/resources/statestore.properties +++ /dev/null @@ -1,45 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -*.domain=debug - -######### StateStore Properties ##### -#*.falcon.state.store.impl=org.apache.falcon.state.store.jdbc.JDBCStateStore -#*.falcon.statestore.jdbc.driver=org.apache.derby.jdbc.EmbeddedDriver -## Falcon currently supports derby, mysql and postgreSQL, change url based on DB. -#*.falcon.statestore.jdbc.url=jdbc:derby:data/falcon.db;create=true - -## StateStore credentials file where username,password and other properties can be stored securely. -## Set this credentials file permission 400 and make sure user who starts falcon should only have read permission. -## Give Absolute path to credentials file along with file name or put in classpath with filename statestore.credentials. -## Credentials file should be present either in given location or class path, otherwise falcon won't start. -#*.falcon.statestore.credentials.file= - -#*.falcon.statestore.connection.data.source=org.apache.commons.dbcp.BasicDataSource -## Maximum number of active connections that can be allocated from this pool at the same time. -#*.falcon.statestore.pool.max.active.conn=10 -## Any additional connection properties that need to be used, specified as comma separated key=value pairs. -#*.falcon.statestore.connection.properties= -## Indicates the interval (in milliseconds) between eviction runs. -#*.falcon.statestore.validate.db.connection.eviction.interval=300000 -## The number of objects to examine during each run of the idle object evictor thread. -#*.falcon.statestore.validate.db.connection.eviction.num=10 -## Creates Falcon DB. -## If set to true, it creates the DB schema if it does not exist. If the DB schema exists is a NOP. -## If set to false, it does not create the DB schema. If the DB schema does not exist it fails start up. -#*.falcon.statestore.create.db.schema=true \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/8e49379d/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java b/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java deleted file mode 100644 index 0df59b2..0000000 --- a/common/src/test/java/org/apache/falcon/cleanup/LogCleanupServiceTest.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.falcon.cleanup; - -import org.apache.falcon.FalconException; -import org.apache.falcon.cluster.util.EmbeddedCluster; -import org.apache.falcon.entity.AbstractTestBase; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.feed.Feed; -import org.apache.falcon.entity.v0.process.Process; -import org.apache.falcon.security.CurrentUser; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.testng.Assert; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; - -import java.io.IOException; -import java.io.InputStream; - -/** - * Test for log cleanup service. - */ -public class LogCleanupServiceTest extends AbstractTestBase { - - private FileSystem fs; - private FileSystem tfs; - private EmbeddedCluster targetDfsCluster; - - private final Path instanceLogPath = new Path("/projects/falcon/staging/falcon/workflows/process/" - + "sample" + "/logs/job-2010-01-01-01-00/000"); - private final Path instanceLogPath1 = new Path("/projects/falcon/staging/falcon/workflows/process/" - + "sample" + "/logs/job-2010-01-01-01-00/001"); - private final Path instanceLogPath2 = new Path("/projects/falcon/staging/falcon/workflows/process/" - + "sample" + "/logs/job-2010-01-01-02-00/001"); - private final Path instanceLogPath3 = new Path("/projects/falcon/staging/falcon/workflows/process/" - + "sample2" + "/logs/job-2010-01-01-01-00/000"); - private final Path instanceLogPath4 = new Path("/projects/falcon/staging/falcon/workflows/process/" - + "sample" + "/logs/latedata/2010-01-01-01-00"); - private final Path instanceLogPath5 = new Path("/projects/falcon/staging/falcon/workflows/process/" - + "sample3" + "/logs/job-2010-01-01-01-00/000"); - private final Path feedInstanceLogPath = new Path("/projects/falcon/staging/falcon/workflows/feed/" - + "impressionFeed" + "/logs/job-2010-01-01-01-00/testCluster/000"); - private final Path feedInstanceLogPath1 = new Path("/projects/falcon/staging/falcon/workflows/feed/" - + "impressionFeed2" + "/logs/job-2010-01-01-01-00/testCluster/000"); - - - @AfterClass - public void tearDown() { - this.dfsCluster.shutdown(); - this.targetDfsCluster.shutdown(); - } - - @Override - @BeforeClass - public void setup() throws Exception { - this.dfsCluster = EmbeddedCluster.newCluster("testCluster", CurrentUser.getUser()); - conf = dfsCluster.getConf(); - fs = dfsCluster.getFileSystem(); - fs.delete(new Path("/"), true); - - storeEntity(EntityType.CLUSTER, "testCluster"); - System.setProperty("test.build.data", "target/tdfs/data" + System.currentTimeMillis()); - this.targetDfsCluster = EmbeddedCluster.newCluster("backupCluster"); - conf = targetDfsCluster.getConf(); - - storeEntity(EntityType.CLUSTER, "backupCluster"); - storeEntity(EntityType.FEED, "impressionFeed"); - storeEntity(EntityType.FEED, "clicksFeed"); - storeEntity(EntityType.FEED, "imp-click-join1"); - storeEntity(EntityType.FEED, "imp-click-join2"); - storeEntity(EntityType.PROCESS, "sample"); - Process process = ConfigurationStore.get().get(EntityType.PROCESS, "sample"); - Process otherProcess = (Process) process.copy(); - otherProcess.setName("sample2"); - otherProcess.setFrequency(new Frequency("days(1)")); - Process noACLProcess = (Process) process.copy(); - noACLProcess.setName("sample3"); - noACLProcess.setACL(null); - ConfigurationStore.get().remove(EntityType.PROCESS, - otherProcess.getName()); - ConfigurationStore.get().publish(EntityType.PROCESS, otherProcess); - ConfigurationStore.get().remove(EntityType.PROCESS, - noACLProcess.getName()); - ConfigurationStore.get().publish(EntityType.PROCESS, noACLProcess); - - fs.mkdirs(instanceLogPath); - fs.mkdirs(instanceLogPath1); - fs.mkdirs(instanceLogPath2); - fs.mkdirs(instanceLogPath3); - fs.mkdirs(instanceLogPath4); - fs.mkdirs(instanceLogPath5); - - // fs.setTimes wont work on dirs - fs.createNewFile(new Path(instanceLogPath, "oozie.log")); - fs.createNewFile(new Path(instanceLogPath, "pigAction_SUCCEEDED.log")); - - tfs = targetDfsCluster.getFileSystem(); - tfs.delete(new Path("/"), true); - fs.mkdirs(feedInstanceLogPath); - fs.mkdirs(feedInstanceLogPath1); - tfs.mkdirs(feedInstanceLogPath); - tfs.mkdirs(feedInstanceLogPath1); - fs.createNewFile(new Path(feedInstanceLogPath, "oozie.log")); - tfs.createNewFile(new Path(feedInstanceLogPath, "oozie.log")); - - // table feed staging dir setup - initializeStagingDirs(); - Thread.sleep(1000); - } - - private void initializeStagingDirs() throws Exception { - final InputStream inputStream = getClass().getResourceAsStream("/config/feed/hive-table-feed.xml"); - Feed tableFeed = (Feed) EntityType.FEED.getUnmarshaller().unmarshal(inputStream); - getStore().publish(EntityType.FEED, tableFeed); - } - - @Test - public void testProcessLogs() throws IOException, FalconException, InterruptedException { - - Assert.assertTrue(fs.exists(instanceLogPath)); - Assert.assertTrue(fs.exists(instanceLogPath1)); - Assert.assertTrue(fs.exists(instanceLogPath2)); - Assert.assertTrue(fs.exists(instanceLogPath3)); - - AbstractCleanupHandler processCleanupHandler = new ProcessCleanupHandler(); - processCleanupHandler.cleanup(); - - Assert.assertFalse(fs.exists(instanceLogPath)); - Assert.assertFalse(fs.exists(instanceLogPath1)); - Assert.assertFalse(fs.exists(instanceLogPath2)); - Assert.assertFalse(fs.exists(instanceLogPath5)); - Assert.assertTrue(fs.exists(instanceLogPath3)); - } - - @Test - public void testFeedLogs() throws IOException, FalconException, InterruptedException { - - Assert.assertTrue(fs.exists(feedInstanceLogPath)); - Assert.assertTrue(tfs.exists(feedInstanceLogPath)); - Assert.assertTrue(fs.exists(feedInstanceLogPath1)); - Assert.assertTrue(tfs.exists(feedInstanceLogPath1)); - - AbstractCleanupHandler feedCleanupHandler = new FeedCleanupHandler(); - feedCleanupHandler.cleanup(); - - Assert.assertFalse(fs.exists(feedInstanceLogPath)); - Assert.assertFalse(tfs.exists(feedInstanceLogPath)); - Assert.assertTrue(fs.exists(feedInstanceLogPath1)); - Assert.assertTrue(tfs.exists(feedInstanceLogPath1)); - } -}
