http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Streaming.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Streaming.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Streaming.java new file mode 100644 index 0000000..cfd8314 --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Streaming.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.action; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.List; + +/** + * A class representing the streaming information within a {@link MapReduceAction}. + * + * Instances of this class should be built using the builder {@link StreamingBuilder}. + * + * The properties of the builder can only be set once, an attempt to set them a second time will trigger + * an {@link IllegalStateException}. + * + * Builder instances can be used to build several elements, although properties already set cannot be changed after + * a call to {@link StreamingBuilder#build} either. + */ [email protected] [email protected] +public class Streaming { + private final String mapper; + private final String reducer; + private final String recordReader; + private final ImmutableList<String> recordReaderMappings; + private final ImmutableList<String> envs; + + Streaming(final String mapper, + final String reducer, + final String recordReader, + final ImmutableList<String> recordReaderMappings, + final ImmutableList<String> envs) { + this.mapper = mapper; + this.reducer = reducer; + this.recordReader = recordReader; + this.recordReaderMappings = recordReaderMappings; + this.envs = envs; + } + + /** + * Returns the mapper of this {@link Streaming} object. + * @return The mapper of this {@link Streaming} object. + */ + public String getMapper() { + return mapper; + } + + /** + * Returns the reducer of this {@link Streaming} object. + * @return The reducer of this {@link Streaming} object. + */ + public String getReducer() { + return reducer; + } + + /** + * Returns the record reader of this {@link Streaming} object. + * @return The record reader of this {@link Streaming} object. + */ + public String getRecordReader() { + return recordReader; + } + + /** + * Returns the record reader mappings of this {@link Streaming} object as a list. + * @return The record reader mappings of this {@link Streaming} object as a list. + */ + public List<String> getRecordReaderMappings() { + return recordReaderMappings; + } + + /** + * Returns the environment variables of this {@link Streaming} object as a list. + * @return The environment variables of this {@link Streaming} object as a list. + */ + public List<String> getEnvs() { + return envs; + } +}
http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/StreamingBuilder.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/StreamingBuilder.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/StreamingBuilder.java new file mode 100644 index 0000000..ab52969 --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/StreamingBuilder.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.action; + +import com.google.common.collect.ImmutableList; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.oozie.fluentjob.api.ModifyOnce; + +/** + * A builder class for {@link Streaming}. + * + * The properties of the builder can only be set once, an attempt to set them a second time will trigger + * an {@link IllegalStateException}. The properties that are lists are an exception to this rule, of course multiple + * elements can be added / removed. + * + * Builder instances can be used to build several elements, although properties already set cannot be changed after + * a call to {@link StreamingBuilder#build} either. + */ [email protected] [email protected] +public class StreamingBuilder implements Builder<Streaming> { + private final ModifyOnce<String> mapper; + private final ModifyOnce<String> reducer; + private final ModifyOnce<String> recordReader; + private final ImmutableList.Builder<String> recordReaderMappings; + private final ImmutableList.Builder<String> envs; + + /** + * Creates a new {@link StreamingBuilder}. + */ + public StreamingBuilder() { + mapper = new ModifyOnce<>(); + reducer = new ModifyOnce<>(); + recordReader = new ModifyOnce<>(); + + recordReaderMappings = new ImmutableList.Builder<>(); + envs = new ImmutableList.Builder<>(); + } + + /** + * Registers a mapper with this builder. + * @param mapper The mapper to register with this builder. + * @return This builder. + */ + public StreamingBuilder withMapper(final String mapper) { + this.mapper.set(mapper); + return this; + } + + /** + * Registers a reducer with this builder. + * @param reducer The reducer to register with this builder. + * @return This builder. + */ + public StreamingBuilder withReducer(final String reducer) { + this.reducer.set(reducer); + return this; + } + + /** + * Registers a record reader with this builder. + * @param recordReader The record reader to register with this builder. + * @return This builder. + */ + public StreamingBuilder withRecordReader(final String recordReader) { + this.recordReader.set(recordReader); + return this; + } + + /** + * Registers a record reader mapping with this builder. + * @param recordReaderMapping The record reader mapping to register with this builder. + * @return This builder. + */ + public StreamingBuilder withRecordReaderMapping(final String recordReaderMapping) { + this.recordReaderMappings.add(recordReaderMapping); + return this; + } + + /** + * Registers an environment variable with this builder. + * @param env The environment variable to register with this builder. + * @return This builder. + */ + public StreamingBuilder withEnv(final String env) { + this.envs.add(env); + return this; + } + + /** + * Creates a new {@link Streaming} object with the properties stores in this builder. + * The new {@link Streaming} object is independent of this builder and the builder can be used to build + * new instances. + * @return A new {@link Streaming} object with the properties stored in this builder. + */ + @Override + public Streaming build() { + return new Streaming(mapper.get(), reducer.get(), recordReader.get(), recordReaderMappings.build(), envs.build()); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowAction.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowAction.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowAction.java new file mode 100644 index 0000000..238bc13 --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowAction.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.action; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import java.util.Map; + +/** + * A class representing the Oozie subworkflow action. + * Instances of this class should be built using the builder {@link SubWorkflowActionBuilder}. + * + * The properties of the builder can only be set once, an attempt to set them a second time will trigger + * an {@link IllegalStateException}. + * + * Builder instances can be used to build several elements, although properties already set cannot be changed after + * a call to {@link SubWorkflowActionBuilder#build} either. + */ [email protected] [email protected] +public class SubWorkflowAction extends Node { + private final String appPath; + private final boolean propagateConfiguration; + private final ImmutableMap<String, String> configuration; + + SubWorkflowAction(final Node.ConstructionData constructionData, + final String appPath, + final boolean propagateConfiguration, + final ImmutableMap<String, String> configuration) { + super(constructionData); + + this.appPath = appPath; + this.propagateConfiguration = propagateConfiguration; + this.configuration = configuration; + } + + /** + * Returns the path to the application definition (usually workflow.xml) of the subworkflow. + * @return The path to the application definition (usually workflow.xml) of the subworkflow. + */ + public String getAppPath() { + return appPath; + } + + /** + * Returns whether the configuration of the main workflow should propagate to the subworkflow. + * @return {@code true} if the configuration of the main workflow should propagate to the subworkflow; + * {@code false} otherwise. + */ + public boolean isPropagatingConfiguration() { + return propagateConfiguration; + } + + /** + * Returns the value associated with the provided configuration property name. + * @param property The name of the configuration property for which the value will be returned. + * @return The value associated with the provided configuration property name. + */ + public String getConfigProperty(final String property) { + return configuration.get(property); + } + + /** + * Returns an immutable map of the configuration key-value pairs stored in this {@link MapReduceAction} object. + * @return An immutable map of the configuration key-value pairs stored in this {@link MapReduceAction} object. + */ + public Map<String, String> getConfiguration() { + return configuration; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowActionBuilder.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowActionBuilder.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowActionBuilder.java new file mode 100644 index 0000000..07761d4 --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/SubWorkflowActionBuilder.java @@ -0,0 +1,159 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.action; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.oozie.fluentjob.api.ModifyOnce; + +import java.util.LinkedHashMap; +import java.util.Map; + +/** + * A builder class for {@link SubWorkflowAction}. + * + * The properties of the builder can only be set once, an attempt to set them a second time will trigger + * an {@link IllegalStateException}. The properties that are lists are an exception to this rule, of course multiple + * elements can be added / removed. + * + * Builder instances can be used to build several elements, although properties already set cannot be changed after + * a call to {@link SubWorkflowActionBuilder#build} either. + */ [email protected] [email protected] +public class SubWorkflowActionBuilder + extends NodeBuilderBaseImpl<SubWorkflowActionBuilder> implements Builder<SubWorkflowAction> { + private final ModifyOnce<String> appPath; + private final ModifyOnce<Boolean> propagateConfiguration; + private final Map<String, ModifyOnce<String>> configuration; + + /** + * Creates and returns an empty builder. + * @return An empty builder. + */ + public static SubWorkflowActionBuilder create() { + final ModifyOnce<String> appPath = new ModifyOnce<>(); + final ModifyOnce<Boolean> propagateConfiguration = new ModifyOnce<>(false); + final Map<String, ModifyOnce<String>> configuration = new LinkedHashMap<>(); + + return new SubWorkflowActionBuilder(null, appPath, propagateConfiguration, configuration); + } + + /** + * Create and return a new {@link SubWorkflowActionBuilder} that is based on an already built + * {@link SubWorkflowAction} object. The properties of the builder will initially be the same as those of the + * provided {@link SubWorkflowAction} object, but it is possible to modify them once. + * @param action The {@link SubWorkflowAction} object on which this {@link SubWorkflowActionBuilder} will be based. + * @return A new {@link SubWorkflowActionBuilder} that is based on a previously built {@link SubWorkflowAction} object. + */ + public static SubWorkflowActionBuilder createFromExistingAction(final SubWorkflowAction action) { + final ModifyOnce<String> appPath = new ModifyOnce<>(action.getAppPath()); + final ModifyOnce<Boolean> propagateConfiguration = new ModifyOnce<>(action.isPropagatingConfiguration()); + final Map<String, ModifyOnce<String>> configuration = + ActionAttributesBuilder.convertToModifyOnceMap(action.getConfiguration()); + + return new SubWorkflowActionBuilder(action, appPath, propagateConfiguration, configuration); + } + + SubWorkflowActionBuilder(final SubWorkflowAction action, + final ModifyOnce<String> appPath, + final ModifyOnce<Boolean> propagateConfiguration, + final Map<String, ModifyOnce<String>> configuration) { + super(action); + + this.appPath = appPath; + this.propagateConfiguration = propagateConfiguration; + this.configuration = configuration; + } + + /** + * Registers the path to the application definition (usually workflow.xml) of the subworkflow. + * @param appPath HDFS application path + * @return This builder. + */ + public SubWorkflowActionBuilder withAppPath(final String appPath) { + this.appPath.set(appPath); + return this; + } + + /** + * Registers that the configuration of the main workflow should propagate to the subworkflow. + * @return This builder. + */ + public SubWorkflowActionBuilder withPropagatingConfiguration() { + this.propagateConfiguration.set(true); + return this; + } + + /** + * Registers that the configuration of the main workflow should NOT propagate to the subworkflow. + * @return This builder. + */ + public SubWorkflowActionBuilder withoutPropagatingConfiguration() { + this.propagateConfiguration.set(false); + return this; + } + + /** + * Registers a configuration property (a key-value pair) with this builder. If the provided key has already been + * set on this builder, an exception is thrown. Setting a key to null means deleting it. + * @param key The name of the property to set. + * @param value The value of the property to set. + * @return this + * @throws IllegalStateException if the provided key has already been set on this builder. + */ + public SubWorkflowActionBuilder withConfigProperty(final String key, final String value) { + ModifyOnce<String> mappedValue = this.configuration.get(key); + + if (mappedValue == null) { + mappedValue = new ModifyOnce<>(value); + this.configuration.put(key, mappedValue); + } + + mappedValue.set(value); + + return this; + } + + /** + * Creates a new {@link SubWorkflowAction} object with the properties stores in this builder. + * The new {@link MapReduceAction} object is independent of this builder and the builder can be used to build + * new instances. + * @return A new {@link MapReduceAction} object with the properties stored in this builder. + */ + @Override + public SubWorkflowAction build() { + final Node.ConstructionData constructionData = getConstructionData(); + + final SubWorkflowAction instance = new SubWorkflowAction( + constructionData, + appPath.get(), + propagateConfiguration.get(), + ActionAttributesBuilder.convertToConfigurationMap(configuration)); + + addAsChildToAllParents(instance); + + return instance; + } + + @Override + protected SubWorkflowActionBuilder getRuntimeSelfReference() { + return this; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Touchz.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Touchz.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Touchz.java new file mode 100644 index 0000000..11c655a --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/action/Touchz.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.action; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * A class representing the touchz command of {@link FSAction}. + */ [email protected] [email protected] +public class Touchz { + private final String path; + + /** + * Creates a new {@link Touchz} object. + * @param path The path of the file that the touch operation will be executed on. + */ + public Touchz(final String path) { + this.path = path; + } + + /** + * Returns the path of the file that the touch operation will be executed on. + * @return The path of the file that the touch operation will be executed on. + */ + public String getPath() { + return path; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DagNodeWithCondition.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DagNodeWithCondition.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DagNodeWithCondition.java new file mode 100644 index 0000000..a0ef83a --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DagNodeWithCondition.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.dag; + +import org.apache.oozie.fluentjob.api.Condition; + +import java.util.Collection; + +/** + * This is a class bundling together a {@link NodeBase} object and a {@link Condition} object. There are no restrictions + * as to whether the node should be a parent with an outgoing conditional path or a child with an incoming conditional path. + */ +public class DagNodeWithCondition { + private final NodeBase node; + private final Condition condition; + + /** + * Removes the first {@link DagNodeWithCondition} object from a collection that has the provided node as its node. + * If there is no such object in the collection, this method returns false; + * @param collection The collection from which to remove an element + * @param node The node to remove together with its condition. + * @return {@code true} if an element was removed; {@code false} if no matching element was contained in the collection. + */ + public static boolean removeFromCollection(final Collection<DagNodeWithCondition> collection, final NodeBase node) { + DagNodeWithCondition element = null; + for (final DagNodeWithCondition nodeWithCondition : collection) { + if (node.equals(nodeWithCondition.getNode())) { + element = nodeWithCondition; + } + } + + if (element != null) { + collection.remove(element); + } + + return element != null; + } + + /** + * Creates a new {@link DagNodeWithCondition} object. + * @param node A {@link NodeBase} object. + * @param condition A {@link Condition} object. + */ + public DagNodeWithCondition(final NodeBase node, + final Condition condition) { + this.node = node; + this.condition = condition; + } + + /** + * Returns the {@link NodeBase} object of this {@link DagNodeWithCondition}. + * @return The {@link NodeBase} object of this {@link DagNodeWithCondition}. + */ + public NodeBase getNode() { + return node; + } + + /** + * Returns the {@link Condition} object of this {@link DagNodeWithCondition}. + * @return The {@link Condition} object of this {@link DagNodeWithCondition}. + */ + public Condition getCondition() { + return condition; + } + + @Override + public boolean equals(final Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + final DagNodeWithCondition that = (DagNodeWithCondition) o; + + if (node != null ? !node.equals(that.node) : that.node != null) { + return false; + } + + return condition != null ? condition.equals(that.condition) : that.condition == null; + } + + @Override + public int hashCode() { + int result = node != null ? node.hashCode() : 0; + result = 31 * result + (condition != null ? condition.hashCode() : 0); + + return result; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Decision.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Decision.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Decision.java new file mode 100644 index 0000000..1b1a742 --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Decision.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.dag; + +import com.google.common.base.Preconditions; +import org.apache.oozie.fluentjob.api.Condition; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A class representing decision nodes in an Oozie workflow definition DAG. These nodes are generated automatically, + * the end user should not need to use this class directly. + */ +public class Decision extends NodeBase { + private NodeBase parent; + private final List<DagNodeWithCondition> childrenWithConditions; + private NodeBase defaultChild; + + /** + * Create a new decision node with the given name. + * @param name The name of the new decision node. + */ + public Decision(final String name) { + super(name); + this.parent = null; + this.childrenWithConditions = new ArrayList<>(); + } + + /** + * Returns the parent of this node. + * @return The parent of this node. + */ + public NodeBase getParent() { + return parent; + } + + /** + * Adds the provided node as a parent of this node. + * @param parent The new parent of this node. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParent(final NodeBase parent) { + Preconditions.checkState(this.parent == null, "Decision nodes cannot have multiple parents."); + + this.parent = parent; + this.parent.addChild(this); + } + + /** + * Adds the provided node as a conditional parent of this node. + * @param parent The new conditional parent of this node. + * @param condition The condition which must be true in addition the parent completing successfully for this node + * to be executed. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParentWithCondition(final Decision parent, final Condition condition) { + Preconditions.checkState(this.parent == null, "Decision nodes cannot have multiple parents."); + + this.parent = parent; + parent.addChildWithCondition(this, condition); + } + + /** + * Adds the provided node as the default conditional parent of this node. + * @param parent The new conditional parent of this node. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParentDefaultConditional(final Decision parent) { + Preconditions.checkState(this.parent == null, "Decision nodes cannot have multiple parents."); + + this.parent = parent; + parent.addDefaultChild(this); + } + + @Override + public void removeParent(final NodeBase parent) { + Preconditions.checkArgument(this.parent == parent, "Trying to remove a nonexistent parent."); + + if (this.parent != null) { + this.parent.removeChild(this); + } + + this.parent = null; + } + + @Override + public void clearParents() { + removeParent(parent); + } + + @Override + public List<NodeBase> getChildren() { + final List<NodeBase> results = new ArrayList<>(); + + for (final DagNodeWithCondition nodeWithCondition : getChildrenWithConditions()) { + results.add(nodeWithCondition.getNode()); + } + + return Collections.unmodifiableList(results); + } + + /** + * Returns the children of this {@link Decision} node together with their conditions (all children are conditional), + * including the default child. + * @return The conditional children of this {@link Decision} node together with their conditions, + * including the default child. + */ + public List<DagNodeWithCondition> getChildrenWithConditions() { + final List<DagNodeWithCondition> results = new ArrayList<>(childrenWithConditions); + + if (defaultChild != null) { + results.add(new DagNodeWithCondition(defaultChild, Condition.defaultCondition())); + } + + return Collections.unmodifiableList(results); + } + + /** + * Returns the default child of this {@code Decision} node. + * @return The default child of this {@code Decision} node. + */ + public NodeBase getDefaultChild() { + return defaultChild; + } + + @Override + protected void addChild(final NodeBase child) { + throw new IllegalStateException("Decision nodes cannot have normal children."); + } + + void addChildWithCondition(final NodeBase child, final Condition condition) { + if (condition.isDefault()) { + addDefaultChild(child); + } + else { + this.childrenWithConditions.add(new DagNodeWithCondition(child, condition)); + } + } + + void addDefaultChild(final NodeBase child) { + Preconditions.checkState(defaultChild == null, "Trying to add a default child to a Decision node that already has one."); + + defaultChild = child; + } + + @Override + protected void removeChild(final NodeBase child) { + if (defaultChild == child) { + defaultChild = null; + } + else { + final int index = indexOfNodeBaseInChildrenWithConditions(child); + + Preconditions.checkArgument(index >= 0, "Trying to remove a nonexistent child."); + + this.childrenWithConditions.remove(index); + } + } + + private int indexOfNodeBaseInChildrenWithConditions(final NodeBase child) { + for (int i = 0; i < this.childrenWithConditions.size(); ++i) { + if (child == this.childrenWithConditions.get(i).getNode()) { + return i; + } + } + + return -1; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DecisionJoin.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DecisionJoin.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DecisionJoin.java new file mode 100644 index 0000000..c96753e --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/DecisionJoin.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.dag; + +/** + * This class represents a joining point where two or more (but not necessarily all) conditional branches originating + * from the same decision node meet. + * This class will NOT be mapped to JAXB classes and XML as decision nodes don't need to be joined is Oozie, this class + * only exists to make implementing the algorithms easier. + */ +public class DecisionJoin extends JoiningNodeBase<Decision> { + + /** + * Creates a new {@link DecisionJoin} object. + * @param name The name of the new decision object. + * @param decision The {@link Decision} node that this {@link DecisionJoin} node closes. + */ + public DecisionJoin(final String name, final Decision decision) { + super(name, decision); + } + + public NodeBase getFirstNonDecisionJoinDescendant() { + NodeBase descendant = getChild(); + + while (descendant != null) { + if (!(descendant instanceof DecisionJoin)) { + return descendant; + } + + descendant = ((DecisionJoin) descendant).getChild(); + } + + return null; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/End.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/End.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/End.java new file mode 100644 index 0000000..0a28eb3 --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/End.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.dag; + +import com.google.common.base.Preconditions; +import org.apache.oozie.fluentjob.api.Condition; + +import java.util.Arrays; +import java.util.List; + +/** + * A class representing end nodes in an Oozie workflow definition DAG. These nodes are generated automatically, + * the end user should not need to use this class directly. + */ +public class End extends NodeBase { + private NodeBase parent; + + /** + * Create a new end node with the given name. + * @param name The name of the new end node. + */ + public End(final String name) { + super(name); + } + + /** + * Returns the parent of this node. + * @return The parent of this node. + */ + public NodeBase getParent() { + return parent; + } + + /** + * Adds the provided node as a parent of this node. + * @param parent The new parent of this node. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParent(final NodeBase parent) { + Preconditions.checkState(this.parent == null, "End nodes cannot have multiple parents."); + + this.parent = parent; + parent.addChild(this); + } + + /** + * Adds the provided node as a conditional parent of this node. + * @param parent The new conditional parent of this node. + * @param condition The condition which must be true in addition the parent completing successfully for this node + * to be executed. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParentWithCondition(final Decision parent, final Condition condition) { + Preconditions.checkState(this.parent == null, "End nodes cannot have multiple parents."); + + this.parent = parent; + parent.addChildWithCondition(this, condition); + } + + /** + * Adds the provided node as the default conditional parent of this node. + * @param parent The new conditional parent of this node. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParentDefaultConditional(Decision parent) { + Preconditions.checkState(this.parent == null, "End nodes cannot have multiple parents."); + + this.parent = parent; + parent.addDefaultChild(this); + } + + @Override + public void removeParent(final NodeBase parent) { + Preconditions.checkArgument(this.parent == parent, "Trying to remove a nonexistent parent."); + + if (this.parent != null) { + this.parent.removeChild(this); + } + + this.parent = null; + } + + @Override + public void clearParents() { + removeParent(parent); + } + + @Override + public List<NodeBase> getChildren() { + return Arrays.asList(); + } + + @Override + protected void addChild(final NodeBase child) { + throw new IllegalStateException("End nodes cannot have children."); + } + + @Override + protected void removeChild(final NodeBase child) { + throw new IllegalStateException("End nodes cannot have children."); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/ExplicitNode.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/ExplicitNode.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/ExplicitNode.java new file mode 100644 index 0000000..ae16a53 --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/ExplicitNode.java @@ -0,0 +1,150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.dag; + +import com.google.common.base.Preconditions; +import org.apache.oozie.fluentjob.api.action.Node; +import org.apache.oozie.fluentjob.api.Condition; + +import java.util.Arrays; +import java.util.List; + +/** + * A class representing action nodes in an Oozie workflow definition DAG. These are the nodes in the intermediate graph + * representation that correspond to the nodes that are explicitly defined by the user. + */ +public class ExplicitNode extends NodeBase { + private NodeBase parent; + private NodeBase child; + private final Node realNode; + + /** + * Create a new explicit node with the given name. + * @param name The name of the new explicit node. + * @param realNode The API level {@link Node} object defined by the end user. + */ + public ExplicitNode(final String name, final Node realNode) { + super(name); + this.realNode = realNode; + } + + /** + * Returns the API level {@link Node} object defined by the end user. + * @return The API level {@link Node} object defined by the end user. + */ + public Node getRealNode() { + return realNode; + } + + /** + * Returns the parent of this node. + * @return The parent of this node. + */ + public NodeBase getParent() { + return parent; + } + + /** + * Returns the child of this node. + * @return The child of this node. + */ + public NodeBase getChild() { + return child; + } + + /** + * Adds the provided node as a parent of this node. + * @param parent The new parent of this node. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParent(final NodeBase parent) { + Preconditions.checkState(this.parent == null, "An explicit node cannot have multiple parents."); + + this.parent = parent; + parent.addChild(this); + } + + /** + * Adds the provided node as a conditional parent of this node. + * @param parent The new conditional parent of this node. + * @param condition The condition which must be true in addition the parent completing successfully for this node + * to be executed. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParentWithCondition(final Decision parent, final Condition condition) { + Preconditions.checkState(this.parent == null, "An explicit node cannot have multiple parents."); + + this.parent = parent; + parent.addChildWithCondition(this, condition); + } + + /** + * Adds the provided node as the default conditional parent of this node. + * @param parent The new conditional parent of this node. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParentDefaultConditional(Decision parent) { + Preconditions.checkState(this.parent == null, "An explicit node cannot have multiple parents."); + + this.parent = parent; + parent.addDefaultChild(this); + } + + @Override + public void removeParent(final NodeBase parent) { + Preconditions.checkArgument(this.parent == parent, "Trying to remove a nonexistent parent."); + + if (this.parent != null) { + this.parent.removeChild(this); + } + + this.parent = null; + } + + @Override + public void clearParents() { + removeParent(parent); + } + + @Override + public List<NodeBase> getChildren() { + if (child == null) { + return Arrays.asList(); + } + + return Arrays.asList(child); + } + + @Override + protected void addChild(final NodeBase child) { + Preconditions.checkState(this.child == null, "Normal nodes cannot have multiple children."); + + this.child = child; + } + + @Override + protected void removeChild(final NodeBase child) { + Preconditions.checkArgument(this.child == child, "Trying to remove a nonexistent child."); + + this.child = null; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Fork.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Fork.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Fork.java new file mode 100644 index 0000000..2657e63 --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Fork.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.dag; + +import com.google.common.base.Preconditions; +import org.apache.oozie.fluentjob.api.Condition; +import org.apache.oozie.fluentjob.api.ModifyOnce; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A class representing end nodes in an Oozie workflow definition DAG. These nodes are generated automatically, + * the end user should not need to use this class directly. + */ +public class Fork extends NodeBase { + private NodeBase parent; + private final List<NodeBase> children; + + private final ModifyOnce<Join> closingJoin; + + /** + * Create a new fork node with the given name. + * @param name The name of the new fork node. + */ + public Fork(final String name) { + super(name); + + this.parent = null; + this.children = new ArrayList<>(); + this.closingJoin = new ModifyOnce<>(); + } + + /** + * Returns the parent of this node. + * @return The parent of this node. + */ + public NodeBase getParent() { + return parent; + } + + /** + * Adds the provided node as a parent of this node. + * @param parent The new parent of this node. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParent(final NodeBase parent) { + Preconditions.checkState(this.parent == null, "Fork nodes cannot have multiple parents."); + + this.parent = parent; + parent.addChild(this); + } + + /** + * Adds the provided node as a conditional parent of this node. + * @param parent The new conditional parent of this node. + * @param condition The condition which must be true in addition the parent completing successfully for this node + * to be executed. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParentWithCondition(final Decision parent, final Condition condition) { + Preconditions.checkState(this.parent == null, "Fork nodes cannot have multiple parents."); + + this.parent = parent; + parent.addChildWithCondition(this, condition); + } + + /** + * Adds the provided node as the default conditional parent of this node. + * @param parent The new conditional parent of this node. + * @throws IllegalStateException if this node already has a parent. + */ + @Override + public void addParentDefaultConditional(Decision parent) { + Preconditions.checkState(this.parent == null, "Fork nodes cannot have multiple parents."); + + this.parent = parent; + parent.addDefaultChild(this); + } + + @Override + public void removeParent(final NodeBase parent) { + Preconditions.checkArgument(this.parent == parent, "Trying to remove a nonexistent parent."); + + if (this.parent != null) { + this.parent.removeChild(this); + } + + this.parent = null; + } + + @Override + public void clearParents() { + removeParent(parent); + } + + @Override + public List<NodeBase> getChildren() { + return Collections.unmodifiableList(new ArrayList<>(children)); + } + + Join getClosingJoin() { + return closingJoin.get(); + } + + boolean isClosed() { + return getClosingJoin() != null; + } + + void close(final Join join) { + closingJoin.set(join); + } + + @Override + protected void addChild(final NodeBase child) { + children.add(child); + } + + @Override + protected void removeChild(final NodeBase child) { + Preconditions.checkArgument(this.children.remove(child),"Trying to remove a nonexistent child."); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/8a0a6487/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Graph.java ---------------------------------------------------------------------- diff --git a/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Graph.java b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Graph.java new file mode 100644 index 0000000..08cb896 --- /dev/null +++ b/fluent-job/fluent-job-api/src/main/java/org/apache/oozie/fluentjob/api/dag/Graph.java @@ -0,0 +1,872 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.fluentjob.api.dag; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import org.apache.oozie.fluentjob.api.action.Node; +import org.apache.oozie.fluentjob.api.workflow.Credentials; +import org.apache.oozie.fluentjob.api.workflow.Global; +import org.apache.oozie.fluentjob.api.workflow.Parameters; +import org.apache.oozie.fluentjob.api.workflow.Workflow; +import org.apache.oozie.fluentjob.api.Condition; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * The class holding the intermediate representation of the workflow. This is where the API level {@link Workflow} + * object is transformed to an intermediate graph (an object of this class), and all control nodes are generated. + * This graph is later further transformed to JAXB objects and to xml. + * + * The conversion from the API level {@link Workflow} object to the intermediate graph is as follows: + * We take the nodes in topological order, meaning every node is processed after all of its dependencies + * have been processed. There are two main possibilities when processing a node: + * - the node has zero or one parent + * - the node has at least two parents. + * + * In the first case, we simply add the converted node as a child to its parent (or the start node if there are none), + * possibly inserting a fork if the parent already has children, or using a pre-existing fork if a parent already + * has one. + * + * In the second case, we have to insert a join. We first check if we can join all incoming paths in a single join + * node or if we have to split them up because they come from multiple embedded forks. It is also possible that some + * incoming paths originate from the same fork but that fork has other outgoing paths as well. In that case we split + * the fork up into multiple embedded forks. + * + * After this, we examine all paths that we are going to join and look for side branches that lead out of the + * fork / join block, violating Oozie's constraints. If these are non-conditional branches, we simply cut them down + * from their original parents and put them under the new join (and possibly under a fork), and make them siblings + * of whatever nodes originally come after the join. This way all original dependencies are preserved, as the original + * parents will still be ancestors (though indirectly) to the relocated nodes, but new dependencies are introduced. + * This preserves the correctness of the workflow but decreases its parallelism. This is unfortunate but Oozie's graph + * format is more restrictive than a general DAG, so we have to accept it. + * + * If the side branches are conditional, we cut above the decision node and insert a join there. We reinsert the + * decision node under the new join. This is very similar to the handling of non-conditional paths, but it + * decreases parallelism even more (we cut one level higher). + * A problem occurs if two or more decision nodes come right after the fork that we want to close. If we cut above + * the decision nodes as usual we gain nothing, because we insert a join and a fork and arrive at the same situation + * as before - multiple decision nodes under a fork. Currently, we are not able to handle this situation and we throw + * an exception. + */ +public class Graph { + private final String name; + private final Start start = new Start("start"); + private final End end = new End("end"); + private final Parameters parameters; + private final Global global; + private final Credentials credentials; + private final Map<String, NodeBase> nodesByName = new LinkedHashMap<>(); + private final Map<Fork, Integer> forkNumbers = new HashMap<>(); + private int forkCounter = 1; + + private final Map<NodeBase, Decision> originalParentToCorrespondingDecision = new HashMap<>(); + private final Map<Decision, Integer> closedPathsOfDecisionNodes = new HashMap<>(); + private int decisionCounter = 1; + private int decisionJoinCounter = 1; + + /** + * Nodes that have a join downstream to them are closed, they should never get new children. + */ + private final Map<NodeBase, Join> closingJoins = new HashMap<>(); + + /** + * Creates a new {@link Graph} object transforming the graph of the provided {@link Workflow} object + * into an intermediate level graph. + * @param workflow The {@link Workflow} object to transform. + */ + public Graph(final Workflow workflow) { + this.name = workflow.getName(); + this.parameters = workflow.getParameters(); + this.global = workflow.getGlobal(); + this.credentials = workflow.getCredentials(); + + final List<Node> nodesFromRootsToLeaves = getNodesFromRootsToLeaves(workflow); + + storeNode(start); + storeNode(end); + + convert(nodesFromRootsToLeaves); + } + + /** + * Returns the name of this graph. + * @return The name of this graph. + */ + public String getName() { + return name; + } + + public Parameters getParameters() { + return parameters; + } + + public Global getGlobal() { + return global; + } + + /** + * Returns the start node of this graph. + * @return The start node of this graph. + */ + public Start getStart() { + return start; + } + + /** + * Returns the end node of this graph. + * @return The end node of this graph. + */ + public End getEnd() { + return end; + } + + /** + * Returns the node with the given name in this graph if it exists, {@code null} otherwise. + * @param name The name of the node that will be returned. + * @return The node with the given name in this graph if it exists, {@code null} otherwise. + */ + public NodeBase getNodeByName(final String name) { + return nodesByName.get(name); + } + + /** + * Returns a collection of the nodes in this graph. + * @return A collection of the nodes in this graph. + */ + public Collection<NodeBase> getNodes() { + return nodesByName.values(); + } + + private void convert(final List<Node> nodesInTopologicalOrder) { + final Map<Node, NodeBase> nodeToNodeBase = new HashMap<>(); + + for (final Node originalNode : nodesInTopologicalOrder) { + final ExplicitNode convertedNode = new ExplicitNode(originalNode.getName(), originalNode); + nodeToNodeBase.put(originalNode, convertedNode); + storeNode(convertedNode); + + checkAndInsertDecisionNode(originalNode, convertedNode); + + final List<DagNodeWithCondition> mappedParentsWithConditions = findMappedParents(originalNode, nodeToNodeBase); + + handleNodeWithParents(convertedNode, mappedParentsWithConditions); + } + + final List<DagNodeWithCondition> finalNodes = findFinalNodes(); + + handleNodeWithParents(end, finalNodes); + } + + private void checkAndInsertDecisionNode(Node originalNode, ExplicitNode convertedNode) { + if (!originalNode.getChildrenWithConditions().isEmpty()) { + // We insert a decision node below the current convertedNode. + final Decision decision = newDecision(); + decision.addParent(convertedNode); + originalParentToCorrespondingDecision.put(convertedNode, decision); + } + } + + private List<DagNodeWithCondition> findMappedParents(Node originalNode, Map<Node, NodeBase> nodeToNodeBase) { + final List<DagNodeWithCondition> mappedParentsWithConditions = new ArrayList<>(); + + for (final Node.NodeWithCondition parentNodeWithCondition : originalNode.getParentsWithConditions()) { + final NodeBase mappedParentNode = nodeToNodeBase.get(parentNodeWithCondition.getNode()); + final Condition condition = parentNodeWithCondition.getCondition(); + final DagNodeWithCondition parentNodeBaseWithCondition = new DagNodeWithCondition(mappedParentNode, condition); + mappedParentsWithConditions.add(parentNodeBaseWithCondition); + } + + for (final Node parent : originalNode.getParentsWithoutConditions()) { + mappedParentsWithConditions.add(new DagNodeWithCondition(nodeToNodeBase.get(parent), null)); + } + + return mappedParentsWithConditions; + } + + private List<DagNodeWithCondition> findFinalNodes() { + final List<DagNodeWithCondition> finalNodes = new ArrayList<>(); + + for (final NodeBase maybeFinalNode : nodesByName.values()) { + final boolean hasNoChildren = maybeFinalNode.getChildren().isEmpty(); + final boolean isNotEnd = maybeFinalNode != end; + if (hasNoChildren && isNotEnd) { + finalNodes.add(new DagNodeWithCondition(maybeFinalNode, null)); + } + } + + return finalNodes; + } + + private void storeNode(final NodeBase node) { + final String name = node.getName(); + + final boolean isPresent = nodesByName.containsKey(name); + if (isPresent) { + final String errorMessage = String.format("Duplicate name '%s' found in graph '%s'", node.getName(), this.getName()); + throw new IllegalArgumentException(errorMessage); + } + + nodesByName.put(node.getName(), node); + } + + private NodeBase getNewParent(final NodeBase originalParent) { + NodeBase newParent = originalParent; + + if (originalParentToCorrespondingDecision.containsKey(newParent)) { + newParent = originalParentToCorrespondingDecision.get(newParent); + } + + newParent = getNearestNonClosedDescendant(newParent); + + return newParent; + } + + private void handleNodeWithParents(final NodeBase node, final List<DagNodeWithCondition> parentsWithConditions) { + // Avoiding adding children to nodes that are inside a closed fork / join pair and to original parents of decision nodes. + final List<DagNodeWithCondition> newParentsWithConditions = new ArrayList<>(); + for (final DagNodeWithCondition parentWithCondition : parentsWithConditions) { + final NodeBase parent = parentWithCondition.getNode(); + final Condition condition = parentWithCondition.getCondition(); + + final NodeBase newParent = getNewParent(parent); + final DagNodeWithCondition newParentWithCondition = new DagNodeWithCondition(newParent, condition); + + if (!newParentsWithConditions.contains(newParentWithCondition)) { + newParentsWithConditions.add(newParentWithCondition); + } + } + + if (newParentsWithConditions.isEmpty()) { + handleSingleParentNode(new DagNodeWithCondition(start, null), node); + } + else if (newParentsWithConditions.size() == 1) { + handleSingleParentNode(newParentsWithConditions.get(0), node); + } + else { + handleMultiParentNodeWithParents(node, newParentsWithConditions); + } + } + + private void handleSingleParentNode(final DagNodeWithCondition parentWithCondition, final NodeBase node) { + addParentWithForkIfNeeded(node, parentWithCondition); + } + + private void handleMultiParentNodeWithParents(final NodeBase node, final List<DagNodeWithCondition> parentsWithConditions) { + final List<PathInformation> paths = new ArrayList<>(); + for (final DagNodeWithCondition parentWithCondition : parentsWithConditions) { + final NodeBase parent = parentWithCondition.getNode(); + paths.add(getPathInfo(parent)); + } + + final BranchingToClose toClose = chooseBranchingToClose(paths); + + // Eliminating redundant parents. + if (toClose.isRedundantParent()) { + final List<DagNodeWithCondition> parentsWithoutRedundant = new ArrayList<>(parentsWithConditions); + DagNodeWithCondition.removeFromCollection(parentsWithoutRedundant, toClose.getRedundantParent()); + + handleNodeWithParents(node, parentsWithoutRedundant); + } + else if (toClose.isDecision()) { + insertDecisionJoin(node, parentsWithConditions, toClose); + } + else { + insertJoin(parentsWithConditions, node, toClose); + } + } + + private void insertDecisionJoin(final NodeBase node, + final List<DagNodeWithCondition> parentsWithConditions, + final BranchingToClose branchingToClose) { + final Decision decision = branchingToClose.getDecision(); + final DecisionJoin decisionJoin = newDecisionJoin(decision, branchingToClose.getPaths().size()); + + for (final DagNodeWithCondition parentWithCondition : parentsWithConditions) { + addParentWithForkIfNeeded(decisionJoin, parentWithCondition); + } + + addParentWithForkIfNeeded(node, new DagNodeWithCondition(decisionJoin, null)); + } + + private void insertJoin(final List<DagNodeWithCondition> parentsWithConditions, + final NodeBase node, + final BranchingToClose branchingToClose) { + if (branchingToClose.isSplittingJoinNeeded()) { + // We have to close a subset of the paths. + final List<DagNodeWithCondition> newParentsWithConditions = new ArrayList<>(parentsWithConditions); +// final List<NodeBase> parentsInToClose = new ArrayList<>(); + + for (final PathInformation path : branchingToClose.getPaths()) { +// parentsInToClose.add(path.getBottom()); + DagNodeWithCondition.removeFromCollection(newParentsWithConditions, path.getBottom()); + } + + final Join newJoin = joinPaths(branchingToClose.getFork(), branchingToClose.getPaths()); + + newParentsWithConditions.add(new DagNodeWithCondition(newJoin, null)); + + handleMultiParentNodeWithParents(node, newParentsWithConditions); + } + else { + // There are no intermediary fork / join pairs to insert, we have to join all paths in a single join. + final Join newJoin = joinPaths(branchingToClose.getFork(), branchingToClose.getPaths()); + + if (newJoin != null) { + addParentWithForkIfNeeded(node, new DagNodeWithCondition(newJoin, null)); + } + else { + // Null means a part of the paths was relocated because of a decision node. + handleNodeWithParents(node, parentsWithConditions); + } + } + } + + // Returning null means we have relocated a part of the paths because of decision nodes, so the caller should try + // adding the node again. + private Join joinPaths(final Fork fork, final List<PathInformation> pathsToJoin) { + final Map<PathInformation, Decision> highestDecisionNodes = new LinkedHashMap<>(); + for (final PathInformation path : pathsToJoin) { + for (int ixNodeOnPath = 0; ixNodeOnPath < path.getNodes().size(); ++ixNodeOnPath) { + final NodeBase nodeOnPath = path.getNodes().get(ixNodeOnPath); + + if (nodeOnPath instanceof Decision) { + // Excluding decision nodes where no branch goes out of this fork / join pair. + if (!isDecisionClosed((Decision) nodeOnPath)) { + highestDecisionNodes.put(path, (Decision) nodeOnPath); + } + } + else if (nodeOnPath == fork) { + break; + } + } + } + + if (highestDecisionNodes.isEmpty()) { + return joinPathsWithoutDecisions(fork, pathsToJoin); + } + else { + return joinPathsWithDecisions(fork, pathsToJoin, highestDecisionNodes); + } + } + + private Join joinPathsWithoutDecisions(final Fork fork, final List<PathInformation> pathsToJoin) { + final Set<NodeBase> mainBranchNodes = new LinkedHashSet<>(); + for (final PathInformation pathInformation : pathsToJoin) { + mainBranchNodes.addAll(pathInformation.getNodes()); + } + + // Taking care of side branches. + final Set<NodeBase> closedNodes = new HashSet<>(); + final List<NodeBase> sideBranches = new ArrayList<>(); + for (final PathInformation path : pathsToJoin) { + for (int ixNodeOnPath = 0; ixNodeOnPath < path.getNodes().size(); ++ixNodeOnPath) { + final NodeBase nodeOnPath = path.getNodes().get(ixNodeOnPath); + + if (nodeOnPath == fork) { + break; + } + + if (nodeOnPath instanceof Decision && isDecisionClosed((Decision) nodeOnPath)) { + break; + } + + sideBranches.addAll(cutSideBranches(nodeOnPath, mainBranchNodes)); + closedNodes.add(nodeOnPath); + } + } + + final Join newJoin; + + // Check if we have to divide the fork. + final boolean hasMoreForkedChildren = pathsToJoin.size() < fork.getChildren().size(); + if (hasMoreForkedChildren) { + // Dividing the fork. + newJoin = divideForkAndCloseSubFork(fork, pathsToJoin); + } else { + // We don't divide the fork. + newJoin = newJoin(fork); + + for (final PathInformation path : pathsToJoin) { + addParentWithForkIfNeeded(newJoin, new DagNodeWithCondition(path.getBottom(), null)); + } + } + + // Inserting the side branches under the new join node. + for (final NodeBase sideBranch : sideBranches) { + addParentWithForkIfNeeded(sideBranch, new DagNodeWithCondition(newJoin, null)); + } + + // Marking the nodes as closed. + for (final NodeBase closedNode : closedNodes) { + markAsClosed(closedNode, newJoin); + } + + return newJoin; + } + + private Join joinPathsWithDecisions(final Fork fork, + final List<PathInformation> pathsToJoin, + final Map<PathInformation, Decision> highestDecisionNodes) { + final Set<Decision> decisions = new HashSet<>(highestDecisionNodes.values()); + + final List<PathInformation> newPaths = new ArrayList<>(); + boolean shouldCloseJoinAndAddOtherDecisionsUnderIt = false; + for (final Decision decision : decisions) { + final NodeBase parentOfDecision = decision.getParent(); + + if (parentOfDecision == fork) { + shouldCloseJoinAndAddOtherDecisionsUnderIt = true; + break; + } + + newPaths.add(getPathInfo(parentOfDecision)); + removeParentWithForkIfNeeded(decision, decision.getParent()); + } + + if (shouldCloseJoinAndAddOtherDecisionsUnderIt) { + closeJoinAndAddOtherDecisionsUnderIt(fork, decisions); + } + else { + for (final PathInformation path : pathsToJoin) { + if (!highestDecisionNodes.containsKey(path)) { + newPaths.add(path); + } + } + + final Join newJoin = joinPaths(fork, newPaths); + + for (final Decision decision : decisions) { + addParentWithForkIfNeeded(decision, new DagNodeWithCondition(newJoin, null)); + } + } + + return null; + } + + private void closeJoinAndAddOtherDecisionsUnderIt(final Fork fork, final Set<Decision> decisions) { + // TODO: Either implement it correctly or give a more informative error message. + throw new IllegalStateException("Conditional paths originating ultimately from the same parallel branching (fork) " + + "do not converge to the same join."); + } + + private void markAsClosed(final NodeBase node, final Join join) { + closingJoins.put(node, join); + } + + private List<NodeBase> cutSideBranches(final NodeBase node, + final Set<NodeBase> mainBranchNodes) { + final List<NodeBase> sideBranches = new ArrayList<>(); + + // Closed forks cannot have side branches. + final boolean isClosedFork = node instanceof Fork && ((Fork) node).isClosed(); + if (!isClosedFork) { + for (final NodeBase childOfForkOrParent : node.getChildren()) { + if (!mainBranchNodes.contains(childOfForkOrParent)) { + removeParentWithForkIfNeeded(childOfForkOrParent, node); + sideBranches.add(childOfForkOrParent); + } + } + } + + return sideBranches; + } + + private Join divideForkAndCloseSubFork(final Fork correspondingFork, + final List<PathInformation> paths) { + final Fork newFork = newFork(); + for (final PathInformation path : paths) { + final int indexOfFork = path.getNodes().indexOf(correspondingFork); + final NodeBase childOfOriginalFork = path.getNodes().get(indexOfFork - 1); + + childOfOriginalFork.removeParent(correspondingFork); + childOfOriginalFork.addParent(newFork); + } + + newFork.addParent(correspondingFork); + + final Join newJoin = newJoin(newFork); + + for (final PathInformation path : paths) { + newJoin.addParent(path.getBottom()); + } + + return newJoin; + } + + private BranchingToClose chooseBranchingToClose(final List<PathInformation> paths) { + int maxPathLength = 0; + for (final PathInformation pathInformation : paths) { + if (maxPathLength < pathInformation.getNodes().size()) { + maxPathLength = pathInformation.getNodes().size(); + } + } + + for (int ixLevel = 0; ixLevel < maxPathLength; ++ixLevel) { + final BranchingToClose foundAtThisLevel = chooseBranchingToClose(paths, ixLevel); + + if (foundAtThisLevel != null) { + return foundAtThisLevel; + } + } + + throw new IllegalStateException("We should never reach here."); + } + + private BranchingToClose chooseBranchingToClose(final List<PathInformation> paths, final int ixLevel) { + for (final PathInformation path : paths) { + if (ixLevel < path.getNodes().size()) { + final NodeBase branching = path.getNodes().get(ixLevel); + + final List<PathInformation> pathsMeetingAtCurrentFork = getPathsContainingNode(branching, paths); + + if (pathsMeetingAtCurrentFork.size() > 1) { + final boolean needToSplitJoin = pathsMeetingAtCurrentFork.size() < paths.size(); + + // If branching is not a Fork or a Decision, then it is a redundant parent. + if (branching instanceof Fork) { + return BranchingToClose.withFork((Fork) branching, pathsMeetingAtCurrentFork, needToSplitJoin); + } else if (branching instanceof Decision) { + return BranchingToClose.withDecision((Decision) branching, pathsMeetingAtCurrentFork, needToSplitJoin); + } + else { + return BranchingToClose.withRedundantParent(branching, pathsMeetingAtCurrentFork, needToSplitJoin); + } + } + } + } + + return null; + } + + private List<PathInformation> getPathsContainingNode(final NodeBase node, final List<PathInformation> paths) { + final List<PathInformation> pathsContainingNode = new ArrayList<>(); + + for (final PathInformation pathInformationMaybeContaining : paths) { + if (pathInformationMaybeContaining.getNodes().contains(node)) { + pathsContainingNode.add(pathInformationMaybeContaining); + } + } + + return pathsContainingNode; + } + + private PathInformation getPathInfo(final NodeBase node) { + NodeBase current = node; + + final List<NodeBase> nodes = new ArrayList<>(); + + while (current != start) { + nodes.add(current); + + if (current instanceof Join) { + // Get the fork pair of this join and go towards that + final Fork forkPair = ((Join) current).getBranchingPair(); + current = forkPair; + } + else if (current instanceof DecisionJoin) { + final Decision decisionPair = ((DecisionJoin) current).getBranchingPair(); + current = decisionPair; + } + else { + current = getSingleParent(current); + } + } + + return new PathInformation(nodes); + } + + private boolean isDecisionClosed(final Decision decision) { + final Integer closedPathsOfDecisionNode = closedPathsOfDecisionNodes.get(decision); + return closedPathsOfDecisionNode != null && decision.getChildren().size() == closedPathsOfDecisionNode; + } + + private NodeBase getSingleParent(final NodeBase node) { + if (node instanceof End) { + return ((End) node).getParent(); + } + else if (node instanceof Fork) { + return ((Fork) node).getParent(); + } + else if (node instanceof Decision) { + return ((Decision) node).getParent(); + } + else if (node instanceof ExplicitNode) { + return ((ExplicitNode) node).getParent(); + } + else if (node instanceof Start) { + throw new IllegalStateException("Start nodes have no parent."); + } + else if (node instanceof Join) { + final Join join = (Join) node; + final int numberOfParents = join.getParents().size(); + if (numberOfParents != 1) { + throw new IllegalStateException("The join node called '" + node.getName() + + "' has " + numberOfParents + " parents instead of 1."); + } + + return join.getParents().get(0); + } + else if (node == null) { + throw new IllegalArgumentException("Null node found."); + } + + throw new IllegalArgumentException("Unknown node type."); + } + + // Returns the first descendant that is not inside a closed fork / join pair. + private NodeBase getNearestNonClosedDescendant(final NodeBase node) { + NodeBase current = node; + + while (closingJoins.containsKey(current)) { + current = closingJoins.get(current); + } + + return current; + } + + private void addParentWithForkIfNeeded(final NodeBase node, final DagNodeWithCondition parentWithCondition) { + final NodeBase parent = parentWithCondition.getNode(); + final Condition condition = parentWithCondition.getCondition(); + if (parent.getChildren().isEmpty() || parent instanceof Fork || parent instanceof Decision) { + if (condition != null) { + if (!(parent instanceof Decision)) { + throw new IllegalStateException("Trying to add a conditional parent that is not a decision."); + } + + node.addParentWithCondition((Decision) parent, condition); + } + else { + node.addParent(parent); + } + } + else { + // If there is no child, we never get to this point. + // There is only one child, otherwise it is a fork and we don't get here. + final NodeBase child = parent.getChildren().get(0); + + if (child instanceof Fork) { + node.addParent(child); + } + else if (child instanceof Join) { + addParentWithForkIfNeeded(node, new DagNodeWithCondition(child, null)); + } + else { + final Fork newFork = newFork(); + + child.removeParent(parent); + child.addParent(newFork); + node.addParent(newFork); + newFork.addParent(parent); + } + } + } + + private void removeParentWithForkIfNeeded(final NodeBase node, final NodeBase parent) { + node.removeParent(parent); + + final boolean isParentForkAndHasOneChild = parent instanceof Fork && parent.getChildren().size() == 1; + if (isParentForkAndHasOneChild) { + final NodeBase grandparent = ((Fork) parent).getParent(); + final NodeBase child = parent.getChildren().get(0); + + removeParentWithForkIfNeeded(parent, grandparent); + child.removeParent(parent); + child.addParent(grandparent); + nodesByName.remove(parent.getName()); + } + } + + private Fork newFork() { + final Fork fork = new Fork("fork" + forkCounter); + + forkNumbers.put(fork, forkCounter); + forkCounter++; + + storeNode(fork); + + return fork; + } + + private Join newJoin(final Fork correspondingFork) { + final Join join = new Join("join" + forkNumbers.get(correspondingFork), correspondingFork); + + storeNode(join); + + return join; + } + + private Decision newDecision() { + final Decision decision = new Decision("decision" + decisionCounter); + + decisionCounter++; + + storeNode(decision); + + return decision; + } + + private DecisionJoin newDecisionJoin(final Decision correspondingDecision, final int numberOfPathsClosed) { + final DecisionJoin decisionJoin = new DecisionJoin("decisionJoin" + decisionJoinCounter, correspondingDecision); + + final Integer numberOfAlreadyClosedChildren = closedPathsOfDecisionNodes.get(correspondingDecision); + final int newNumber = numberOfPathsClosed + (numberOfAlreadyClosedChildren == null ? 0 : numberOfAlreadyClosedChildren); + + closedPathsOfDecisionNodes.put(correspondingDecision, newNumber); + + decisionJoinCounter++; + + storeNode(decisionJoin); + + return decisionJoin; + } + + private static List<Node> getNodesFromRootsToLeaves(final Workflow workflow) { + final List<Node> nodes = new ArrayList<>(workflow.getRoots()); + + for (int i = 0; i < nodes.size(); ++i) { + final Node current = nodes.get(i); + + for (final Node child : current.getAllChildren()) { + // Checking if every dependency has been processed, if not, we do not add the start to the list. + final List<Node> dependencies = child.getAllParents(); + if (nodes.containsAll(dependencies) && !nodes.contains(child)) { + nodes.add(child); + } + } + } + + return nodes; + } + + public Credentials getCredentials() { + return credentials; + } + + private static class PathInformation { + private final ImmutableList<NodeBase> nodes; + + PathInformation(final List<NodeBase> nodes) { + this.nodes = new ImmutableList.Builder<NodeBase>().addAll(nodes).build(); + } + + NodeBase getBottom() { + return nodes.get(0); + } + + public List<NodeBase> getNodes() { + return nodes; + } + + } + + private static class BranchingToClose { + private final Fork fork; + private final Decision decision; + private final NodeBase redundantParent; + private final ImmutableList<PathInformation> paths; + private final boolean needToSplitJoin; + + static BranchingToClose withFork(final Fork fork, + final List<PathInformation> paths, + final boolean needToSplitJoin) { + return new BranchingToClose(fork, null, null, paths, needToSplitJoin); + } + + static BranchingToClose withDecision(final Decision decision, + final List<PathInformation> paths, + final boolean needToSplitJoin) { + return new BranchingToClose(null, decision, null, paths, needToSplitJoin); + } + + static BranchingToClose withRedundantParent(final NodeBase redundantParent, + final List<PathInformation> paths, + final boolean needToSplitJoin) { + return new BranchingToClose(null, null, redundantParent, paths, needToSplitJoin); + } + + private BranchingToClose(final Fork fork, + final Decision decision, + final NodeBase redundantParent, + final List<PathInformation> paths, + final boolean needToSplitJoin) { + checkOnlyOneIsNotNull(fork, decision, redundantParent); + + this.fork = fork; + this.decision = decision; + this.redundantParent = redundantParent; + this.paths = ImmutableList.copyOf(paths); + this.needToSplitJoin = needToSplitJoin; + } + + public Fork getFork() { + return fork; + } + + public Decision getDecision() { + return decision; + } + + NodeBase getRedundantParent() { + return redundantParent; + } + + List<PathInformation> getPaths() { + return paths; + } + + boolean isDecision() { + return decision != null; + } + + boolean isRedundantParent() { + return redundantParent != null; + } + + boolean isSplittingJoinNeeded() { + return needToSplitJoin; + } + + private void checkOnlyOneIsNotNull(final Fork fork, final Decision decision, final NodeBase redundantParent) { + int counter = 0; + + if (fork != null) { + ++counter; + } + + if (decision != null) { + ++counter; + } + + if (redundantParent != null) { + ++counter; + } + + Preconditions.checkArgument(counter == 1, "Exactly one of 'fork' and 'redundantParent' must be non-null."); + } + } +}
