FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and Bundle.java. Contributed by Ruslan Ostafiychuk
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/395675fb Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/395675fb Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/395675fb Branch: refs/heads/master Commit: 395675fb0f4096dc44f1e325681536328956440e Parents: 8c7eaa6 Author: Ruslan Ostafiychuk <[email protected]> Authored: Tue Feb 3 15:18:27 2015 +0200 Committer: Ruslan Ostafiychuk <[email protected]> Committed: Thu Apr 9 16:22:05 2015 +0300 ---------------------------------------------------------------------- falcon-regression/CHANGES.txt | 2 + .../regression/Entities/ClusterMerlin.java | 26 ++ .../falcon/regression/Entities/FeedMerlin.java | 46 ++++ .../regression/Entities/ProcessMerlin.java | 145 +++++++++++ .../falcon/regression/core/bundle/Bundle.java | 232 +++--------------- .../falcon/regression/core/util/BundleUtil.java | 2 +- .../regression/core/util/InstanceUtil.java | 102 -------- .../falcon/regression/AuthorizationTest.java | 126 +++++----- .../regression/ELExpCurrentAndLastWeekTest.java | 3 +- .../falcon/regression/ELValidationsTest.java | 3 +- .../falcon/regression/ExternalFSTest.java | 24 +- .../regression/FeedInstanceListingTest.java | 2 +- .../falcon/regression/FeedLateRerunTest.java | 20 +- .../falcon/regression/FeedReplicationTest.java | 162 ++++++------- .../apache/falcon/regression/LogMoverTest.java | 10 +- .../apache/falcon/regression/NewRetryTest.java | 59 ++--- .../falcon/regression/NoOutputProcessTest.java | 3 +- .../falcon/regression/ProcessFrequencyTest.java | 3 +- .../ProcessInstanceColoMixedTest.java | 27 +-- .../regression/ProcessInstanceKillsTest.java | 3 +- .../regression/ProcessInstanceResumeTest.java | 3 +- .../regression/ProcessInstanceRunningTest.java | 3 +- .../regression/ProcessInstanceStatusTest.java | 3 +- .../regression/ProcessInstanceSuspendTest.java | 8 +- .../falcon/regression/ProcessLateRerunTest.java | 19 +- .../falcon/regression/ProcessSLATest.java | 10 +- .../ValidateAPIPrismAndServerTest.java | 4 +- .../entity/EntitiesPatternSearchTest.java | 12 +- .../regression/entity/ListEntitiesTest.java | 2 +- .../falcon/regression/hcat/HCatProcessTest.java | 11 +- .../prism/NewPrismProcessUpdateTest.java | 151 +++++------- .../regression/prism/OptionalInputTest.java | 45 ++-- .../PrismFeedReplicationPartitionExpTest.java | 240 +++++++++---------- .../prism/PrismFeedReplicationUpdateTest.java | 52 ++-- .../regression/prism/PrismFeedUpdateTest.java | 8 +- .../prism/PrismProcessDeleteTest.java | 110 ++++----- .../prism/PrismProcessScheduleTest.java | 4 +- .../regression/prism/PrismProcessSnSTest.java | 2 +- .../regression/prism/PrismSubmitTest.java | 16 +- .../prism/ProcessPartitionExpVariableTest.java | 4 +- .../prism/RescheduleKilledProcessTest.java | 9 +- .../RescheduleProcessInFinalStatesTest.java | 3 +- .../prism/UpdateAtSpecificTimeTest.java | 12 +- .../falcon/regression/ui/ProcessUITest.java | 4 +- 44 files changed, 808 insertions(+), 927 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/CHANGES.txt ---------------------------------------------------------------------- diff --git a/falcon-regression/CHANGES.txt b/falcon-regression/CHANGES.txt index 2d0e4c3..76c1090 100644 --- a/falcon-regression/CHANGES.txt +++ b/falcon-regression/CHANGES.txt @@ -63,6 +63,8 @@ Trunk (Unreleased) via Samarth Gupta) IMPROVEMENTS + FALCON-1135 Migrate methods related to *Merlin.java classes from InstanceUtil.java and + Bundle.java (Ruslan Ostafiychuk) FALCON-1088 Fixing FeedDelayParallelTimeoutTest and renaming it to FeedDelayTest(Pragya M via Samarth G) http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java index 22ec5da..a99b307 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ClusterMerlin.java @@ -23,11 +23,17 @@ import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.ACL; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.cluster.ClusterLocationType; +import org.apache.falcon.entity.v0.cluster.Interface; +import org.apache.falcon.entity.v0.cluster.Interfaces; +import org.apache.falcon.entity.v0.cluster.Interfacetype; +import org.apache.falcon.entity.v0.cluster.Location; import org.testng.Assert; import javax.xml.bind.JAXBException; import java.io.StringWriter; import java.util.HashMap; +import java.util.List; import java.util.Map; /** Class for representing a cluster xml. */ @@ -78,4 +84,24 @@ public class ClusterMerlin extends Cluster { acl.setPermission(permission); this.setACL(acl); } + + public void setInterface(Interfacetype interfacetype, String value) { + final Interfaces interfaces = this.getInterfaces(); + final List<Interface> interfaceList = interfaces.getInterfaces(); + for (final Interface anInterface : interfaceList) { + if (anInterface.getType() == interfacetype) { + anInterface.setEndpoint(value); + } + } + } + + public void setWorkingLocationPath(String path) { + for (Location location : getLocations().getLocations()) { + if (location.getName() == ClusterLocationType.WORKING) { + location.setPath(path); + break; + } + } + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java index 70e2e73..1b59227 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/FeedMerlin.java @@ -328,4 +328,50 @@ public class FeedMerlin extends Feed { this.setSla(sla); } + /** + * Sets new feed data path (for first location). + * + * @param path new feed data path + */ + public void setFilePath(String path) { + getLocations().getLocations().get(0).setPath(path); + } + + + /** + * Retrieves prefix (main sub-folders) of first feed data path. + */ + public String getFeedPrefix() { + String path = getLocations().getLocations().get(0).getPath(); + return path.substring(0, path.indexOf('$')); + } + + public void setValidity(String feedStart, String feedEnd) { + this.getClusters().getClusters().get(0).getValidity() + .setStart(TimeUtil.oozieDateToDate(feedStart).toDate()); + this.getClusters().getClusters().get(0).getValidity() + .setEnd(TimeUtil.oozieDateToDate(feedEnd).toDate()); + + } + + public void setDataLocationPath(String path) { + final List<Location> locations = this.getLocations().getLocations(); + for (Location location : locations) { + if (location.getType() == LocationType.DATA) { + location.setPath(path); + } + } + } + + public void setPeriodicity(int frequency, Frequency.TimeUnit periodicity) { + Frequency frq = new Frequency(String.valueOf(frequency), periodicity); + this.setFrequency(frq); + } + + public void setTableUri(String tableUri) { + final CatalogTable catalogTable = new CatalogTable(); + catalogTable.setUri(tableUri); + this.setTable(catalogTable); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java index 01fdd04..8732a44 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/Entities/ProcessMerlin.java @@ -24,6 +24,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.process.EngineType; import org.apache.falcon.entity.v0.process.Sla; import org.apache.falcon.entity.v0.process.ACL; import org.apache.falcon.entity.v0.process.Cluster; @@ -35,6 +36,7 @@ import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.entity.v0.process.Properties; import org.apache.falcon.entity.v0.process.Property; import org.apache.falcon.entity.v0.process.Validity; +import org.apache.falcon.entity.v0.process.Workflow; import org.apache.falcon.regression.core.util.TimeUtil; import org.testng.Assert; @@ -268,6 +270,149 @@ public class ProcessMerlin extends Process { this.setSla(sla); } + /** + * Sets new process validity on all the process clusters. + * + * @param startTime start of process validity + * @param endTime end of process validity + */ + public void setValidity(String startTime, String endTime) { + + for (Cluster cluster : this.getClusters().getClusters()) { + cluster.getValidity().setStart(TimeUtil.oozieDateToDate(startTime).toDate()); + cluster.getValidity().setEnd(TimeUtil.oozieDateToDate(endTime).toDate()); + } + } + + /** + * Adds one output into process. + */ + public void addOutputFeed(String outputName, String feedName) { + Output out1 = getOutputs().getOutputs().get(0); + Output out2 = new Output(); + out2.setFeed(feedName); + out2.setName(outputName); + out2.setInstance(out1.getInstance()); + getOutputs().getOutputs().add(out2); + } + + + + /** + * Adds one input into process. + */ + public void addInputFeed(String inputName, String feedName) { + Input in1 = getInputs().getInputs().get(0); + Input in2 = new Input(); + in2.setEnd(in1.getEnd()); + in2.setFeed(feedName); + in2.setName(inputName); + in2.setPartition(in1.getPartition()); + in2.setStart(in1.getStart()); + in2.setOptional(in1.isOptional()); + getInputs().getInputs().add(in2); + } + + + public void setInputFeedWithEl(String inputFeedName, String startEl, String endEl) { + Inputs inputs = new Inputs(); + Input input = new Input(); + input.setFeed(inputFeedName); + input.setStart(startEl); + input.setEnd(endEl); + input.setName("inputData"); + inputs.getInputs().add(input); + this.setInputs(inputs); + } + + public void setDatasetInstances(String startInstance, String endInstance) { + this.getInputs().getInputs().get(0).setStart(startInstance); + this.getInputs().getInputs().get(0).setEnd(endInstance); + } + + public void setProcessInputStartEnd(String start, String end) { + for (Input input : this.getInputs().getInputs()) { + input.setStart(start); + input.setEnd(end); + } + } + + /** + * Sets name(s) of the process output(s). + * + * @param names new names of the outputs + */ + public void setOutputNames(String... names) { + Outputs outputs = this.getOutputs(); + Assert.assertEquals(outputs.getOutputs().size(), names.length, + "Number of output names is not equal to number of outputs in process"); + for (int i = 0; i < names.length; i++) { + outputs.getOutputs().get(i).setName(names[i]); + } + this.setOutputs(outputs); + } + + + /** + * Sets partition for each input, according to number of supplied partitions. + * + * @param partition partitions to be set + */ + public void setInputPartition(String... partition) { + for (int i = 0; i < partition.length; i++) { + this.getInputs().getInputs().get(i).setPartition(partition[i]); + } + } + + /** + * Adds optional property to process definition. + * + * @param properties desired properties to be added + */ + public void addProperties(Property... properties) { + for (Property property : properties) { + this.getProperties().getProperties().add(property); + } + } + + /** + * Changes names of process inputs. + * + * @param names desired names of inputs + */ + public void setInputNames(String... names) { + for (int i = 0; i < names.length; i++) { + this.getInputs().getInputs().get(i).setName(names[i]); + } + } + + public void setPeriodicity(int frequency, Frequency.TimeUnit periodicity) { + Frequency frq = new Frequency(String.valueOf(frequency), periodicity); + this.setFrequency(frq); + } + + public void setTimeOut(int magnitude, Frequency.TimeUnit unit) { + Frequency frq = new Frequency(String.valueOf(magnitude), unit); + this.setTimeout(frq); + } + + + + public void setWorkflow(String wfPath, String libPath, EngineType engineType) { + Workflow w = this.getWorkflow(); + if (engineType != null) { + w.setEngine(engineType); + } + if (libPath != null) { + w.setLib(libPath); + } + w.setPath(wfPath); + this.setWorkflow(w); + } + + public String getFirstInputName() { + return getInputs().getInputs().get(0).getName(); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java index b0fa0a5..2f43972 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/bundle/Bundle.java @@ -21,24 +21,14 @@ package org.apache.falcon.regression.core.bundle; import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.Frequency.TimeUnit; -import org.apache.falcon.entity.v0.cluster.ClusterLocationType; -import org.apache.falcon.entity.v0.cluster.Interface; -import org.apache.falcon.entity.v0.cluster.Interfaces; import org.apache.falcon.entity.v0.cluster.Interfacetype; -import org.apache.falcon.entity.v0.feed.CatalogTable; import org.apache.falcon.entity.v0.feed.ClusterType; -import org.apache.falcon.entity.v0.feed.Location; -import org.apache.falcon.entity.v0.feed.LocationType; +import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Cluster; import org.apache.falcon.entity.v0.process.EngineType; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.entity.v0.process.Inputs; import org.apache.falcon.entity.v0.process.LateProcess; -import org.apache.falcon.entity.v0.process.Output; -import org.apache.falcon.entity.v0.process.Outputs; import org.apache.falcon.entity.v0.process.Property; import org.apache.falcon.entity.v0.process.Retry; -import org.apache.falcon.entity.v0.process.Workflow; import org.apache.falcon.regression.Entities.ClusterMerlin; import org.apache.falcon.regression.Entities.FeedMerlin; import org.apache.falcon.regression.Entities.ProcessMerlin; @@ -50,9 +40,6 @@ import org.apache.falcon.regression.core.util.TimeUtil; import org.apache.falcon.regression.core.util.Util; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.log4j.Logger; -import org.joda.time.DateTime; -import org.joda.time.format.DateTimeFormat; -import org.joda.time.format.DateTimeFormatter; import org.testng.Assert; import javax.xml.bind.JAXBException; @@ -228,7 +215,7 @@ public class Bundle { */ public void generateUniqueBundle(String prefix) { /* creating new names */ - List<ClusterMerlin> clusterMerlinList = BundleUtil.fromString(clusters); + List<ClusterMerlin> clusterMerlinList = BundleUtil.getClustersFromStrings(clusters); Map<String, String> clusterNameMap = new HashMap<String, String>(); for (ClusterMerlin clusterMerlin : clusterMerlinList) { clusterNameMap.putAll(clusterMerlin.setUniqueName(prefix)); @@ -309,14 +296,7 @@ public class Bundle { */ public void setProcessInput(String startEl, String endEl) { ProcessMerlin process = getProcessObject(); - Inputs inputs = new Inputs(); - Input input = new Input(); - input.setFeed(Util.readEntityName(getInputFeedFromBundle())); - input.setStart(startEl); - input.setEnd(endEl); - input.setName("inputData"); - inputs.getInputs().add(input); - process.setInputs(inputs); + process.setInputFeedWithEl(Util.readEntityName(getInputFeedFromBundle()), startEl, endEl); this.setProcessData(process.toString()); } @@ -342,10 +322,7 @@ public class Bundle { public void setFeedValidity(String feedStart, String feedEnd, String feedName) { FeedMerlin feedElement = getFeedElement(feedName); - feedElement.getClusters().getClusters().get(0).getValidity() - .setStart(TimeUtil.oozieDateToDate(feedStart).toDate()); - feedElement.getClusters().getClusters().get(0).getValidity() - .setEnd(TimeUtil.oozieDateToDate(feedEnd).toDate()); + feedElement.setValidity(feedStart, feedEnd); writeFeedElement(feedElement, feedName); } @@ -377,24 +354,19 @@ public class Bundle { public void setDatasetInstances(String startInstance, String endInstance) { ProcessMerlin processElement = getProcessObject(); - processElement.getInputs().getInputs().get(0).setStart(startInstance); - processElement.getInputs().getInputs().get(0).setEnd(endInstance); + processElement.setDatasetInstances(startInstance, endInstance); setProcessData(processElement.toString()); } public void setProcessPeriodicity(int frequency, TimeUnit periodicity) { ProcessMerlin processElement = getProcessObject(); - Frequency frq = new Frequency("" + frequency, periodicity); - processElement.setFrequency(frq); + processElement.setPeriodicity(frequency, periodicity); setProcessData(processElement.toString()); } public void setProcessInputStartEnd(String start, String end) { ProcessMerlin processElement = getProcessObject(); - for (Input input : processElement.getInputs().getInputs()) { - input.setStart(start); - input.setEnd(end); - } + processElement.setProcessInputStartEnd(start, end); setProcessData(processElement.toString()); } @@ -421,23 +393,10 @@ public class Bundle { } public void setOutputFeedLocationData(String path) { - ProcessMerlin processElement = new ProcessMerlin(processData); - String outputDataset = null; - int datasetIndex; - for (datasetIndex = 0; datasetIndex < dataSets.size(); datasetIndex++) { - outputDataset = dataSets.get(datasetIndex); - if (outputDataset.contains(processElement.getOutputs().getOutputs().get(0).getFeed())) { - break; - } - } - - FeedMerlin feedElement = new FeedMerlin(outputDataset); - Location l = new Location(); - l.setPath(path); - l.setType(LocationType.DATA); - feedElement.getLocations().getLocations().set(0, l); - dataSets.set(datasetIndex, feedElement.toString()); - LOGGER.info("modified location path dataSet is: " + dataSets.get(datasetIndex)); + FeedMerlin feedElement = getFeedElement(getOutputFeedNameFromBundle()); + feedElement.setDataLocationPath(path); + writeFeedElement(feedElement, feedElement.getName()); + LOGGER.info("modified location path dataSet is: " + feedElement); } public void setProcessConcurrency(int concurrency) { @@ -456,15 +415,7 @@ public class Bundle { public void setProcessWorkflow(String wfPath, String libPath, EngineType engineType) { ProcessMerlin processElement = getProcessObject(); - Workflow w = processElement.getWorkflow(); - if (engineType != null) { - w.setEngine(engineType); - } - if (libPath != null) { - w.setLib(libPath); - } - w.setPath(wfPath); - processElement.setWorkflow(w); + processElement.setWorkflow(wfPath, libPath, engineType); setProcessData(processElement.toString()); } @@ -501,8 +452,7 @@ public class Bundle { public void setInputFeedPeriodicity(int frequency, TimeUnit periodicity) { String feedName = getInputFeedNameFromBundle(); FeedMerlin feedElement = getFeedElement(feedName); - Frequency frq = new Frequency("" + frequency, periodicity); - feedElement.setFrequency(frq); + feedElement.setPeriodicity(frequency, periodicity); writeFeedElement(feedElement, feedName); } @@ -520,12 +470,7 @@ public class Bundle { public void setInputFeedDataPath(String path) { String feedName = getInputFeedNameFromBundle(); FeedMerlin feedElement = getFeedElement(feedName); - final List<Location> locations = feedElement.getLocations().getLocations(); - for (Location location : locations) { - if (location.getType() == LocationType.DATA) { - locations.get(0).setPath(path); - } - } + feedElement.setDataLocationPath(path); writeFeedElement(feedElement, feedName); } @@ -536,40 +481,9 @@ public class Bundle { .getPath()); } - public void setProcessValidity(DateTime startDate, DateTime endDate) { - - DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd/HH:mm"); - - String start = formatter.print(startDate).replace("/", "T") + "Z"; - String end = formatter.print(endDate).replace("/", "T") + "Z"; - - ProcessMerlin processElement = new ProcessMerlin(processData); - - for (Cluster cluster : processElement.getClusters().getClusters()) { - - org.apache.falcon.entity.v0.process.Validity validity = - new org.apache.falcon.entity.v0.process.Validity(); - validity.setStart(TimeUtil.oozieDateToDate(start).toDate()); - validity.setEnd(TimeUtil.oozieDateToDate(end).toDate()); - cluster.setValidity(validity); - - } - - processData = processElement.toString(); - } - public void setProcessValidity(String startDate, String endDate) { ProcessMerlin processElement = new ProcessMerlin(processData); - - for (Cluster cluster : processElement.getClusters().getClusters()) { - org.apache.falcon.entity.v0.process.Validity validity = - new org.apache.falcon.entity.v0.process.Validity(); - validity.setStart(TimeUtil.oozieDateToDate(startDate).toDate()); - validity.setEnd(TimeUtil.oozieDateToDate(endDate).toDate()); - cluster.setValidity(validity); - - } - + processElement.setValidity(startDate, endDate); processData = processElement.toString(); } @@ -603,16 +517,9 @@ public class Bundle { } } - public void addProcessInput(String feed, String feedName) { + public void addProcessInput(String inputName, String feedName) { ProcessMerlin processElement = getProcessObject(); - Input in1 = processElement.getInputs().getInputs().get(0); - Input in2 = new Input(); - in2.setEnd(in1.getEnd()); - in2.setFeed(feed); - in2.setName(feedName); - in2.setPartition(in1.getPartition()); - in2.setStart(in1.getStart()); - processElement.getInputs().getInputs().add(in2); + processElement.addInputFeed(inputName, feedName); setProcessData(processElement.toString()); } @@ -654,43 +561,27 @@ public class Bundle { public void setClusterInterface(Interfacetype interfacetype, String value) { ClusterMerlin c = getClusterElement(); - final Interfaces interfaces = c.getInterfaces(); - final List<Interface> interfaceList = interfaces.getInterfaces(); - for (final Interface anInterface : interfaceList) { - if (anInterface.getType() == interfacetype) { - anInterface.setEndpoint(value); - } - } + c.setInterface(interfacetype, value); writeClusterElement(c); } public void setInputFeedTableUri(String tableUri) { final String feedStr = getInputFeedFromBundle(); FeedMerlin feed = new FeedMerlin(feedStr); - final CatalogTable catalogTable = new CatalogTable(); - catalogTable.setUri(tableUri); - feed.setTable(catalogTable); + feed.setTableUri(tableUri); writeFeedElement(feed, feed.getName()); } public void setOutputFeedTableUri(String tableUri) { final String feedStr = getOutputFeedFromBundle(); FeedMerlin feed = new FeedMerlin(feedStr); - final CatalogTable catalogTable = new CatalogTable(); - catalogTable.setUri(tableUri); - feed.setTable(catalogTable); + feed.setTableUri(tableUri); writeFeedElement(feed, feed.getName()); } public void setCLusterWorkingPath(String clusterData, String path) { ClusterMerlin c = new ClusterMerlin(clusterData); - for (int i = 0; i < c.getLocations().getLocations().size(); i++) { - if (c.getLocations().getLocations().get(i).getName().equals(ClusterLocationType.WORKING)) { - c.getLocations().getLocations().get(i).setPath(path); - } - } - - //this.setClusterData(clusterData) + c.setWorkingLocationPath(path); writeClusterElement(c); } @@ -795,17 +686,13 @@ public class Bundle { public void setProcessLibPath(String libPath) { ProcessMerlin processElement = getProcessObject(); - Workflow wf = processElement.getWorkflow(); - wf.setLib(libPath); - processElement.setWorkflow(wf); + processElement.getWorkflow().setLib(libPath); setProcessData(processElement.toString()); - } public void setProcessTimeOut(int magnitude, TimeUnit unit) { ProcessMerlin processElement = getProcessObject(); - Frequency frq = new Frequency("" + magnitude, unit); - processElement.setTimeout(frq); + processElement.setTimeOut(magnitude, unit); setProcessData(processElement.toString()); } @@ -901,9 +788,7 @@ public class Bundle { */ public void setProcessInputNames(String... names) { ProcessMerlin p = new ProcessMerlin(processData); - for (int i = 0; i < names.length; i++) { - p.getInputs().getInputs().get(i).setName(names[i]); - } + p.setInputNames(names); processData = p.toString(); } @@ -914,9 +799,7 @@ public class Bundle { */ public void addProcessProperty(Property... properties) { ProcessMerlin p = new ProcessMerlin(processData); - for (Property property : properties) { - p.getProperties().getProperties().add(property); - } + p.addProperties(properties); processData = p.toString(); } @@ -927,9 +810,7 @@ public class Bundle { */ public void setProcessInputPartition(String... partition) { ProcessMerlin p = new ProcessMerlin(processData); - for (int i = 0; i < partition.length; i++) { - p.getInputs().getInputs().get(i).setPartition(partition[i]); - } + p.setInputPartition(partition); processData = p.toString(); } @@ -940,46 +821,23 @@ public class Bundle { */ public void setProcessOutputNames(String... names) { ProcessMerlin p = new ProcessMerlin(processData); - Outputs outputs = p.getOutputs(); - Assert.assertEquals(outputs.getOutputs().size(), names.length, - "Number of output names is not equal to number of outputs in process"); - for (int i = 0; i < names.length; i++) { - outputs.getOutputs().get(i).setName(names[i]); - } - p.setOutputs(outputs); + p.setOutputNames(names); processData = p.toString(); } - public void addInputFeedToBundle(String feedRefName, String feed, int templateInputIdx) { - this.getDataSets().add(feed); - String feedName = Util.readEntityName(feed); - String vProcessData = getProcessData(); - - ProcessMerlin processObject = new ProcessMerlin(vProcessData); - final List<Input> processInputs = processObject.getInputs().getInputs(); - Input templateInput = processInputs.get(templateInputIdx); - Input newInput = new Input(); - newInput.setFeed(feedName); - newInput.setName(feedRefName); - newInput.setOptional(templateInput.isOptional()); - newInput.setStart(templateInput.getStart()); - newInput.setEnd(templateInput.getEnd()); - newInput.setPartition(templateInput.getPartition()); - processInputs.add(newInput); + public void addInputFeedToBundle(String feedRefName, Feed feed) { + this.getDataSets().add(feed.toString()); + + ProcessMerlin processObject = new ProcessMerlin(processData); + processObject.addInputFeed(feedRefName, feed.getName()); setProcessData(processObject.toString()); } - public void addOutputFeedToBundle(String feedRefName, String feed, int templateOutputIdx) { - this.getDataSets().add(feed); - String feedName = Util.readEntityName(feed); + public void addOutputFeedToBundle(String feedRefName, Feed feed) { + this.getDataSets().add(feed.toString()); + ProcessMerlin processObject = getProcessObject(); - final List<Output> processOutputs = processObject.getOutputs().getOutputs(); - Output templateOutput = processOutputs.get(templateOutputIdx); - Output newOutput = new Output(); - newOutput.setFeed(feedName); - newOutput.setName(feedRefName); - newOutput.setInstance(templateOutput.getInstance()); - processOutputs.add(newOutput); + processObject.addOutputFeed(feedRefName, feed.getName()); setProcessData(processObject.toString()); } @@ -1000,26 +858,12 @@ public class Bundle { public String getInputFeedFromBundle() { ProcessMerlin processObject = new ProcessMerlin(getProcessData()); - for (Input input : processObject.getInputs().getInputs()) { - for (String feed : getDataSets()) { - if (Util.readEntityName(feed).equalsIgnoreCase(input.getFeed())) { - return feed; - } - } - } - return null; + return getFeed(processObject.getInputs().getInputs().get(0).getFeed()); } public String getOutputFeedFromBundle() { ProcessMerlin processObject = new ProcessMerlin(getProcessData()); - for (Output output : processObject.getOutputs().getOutputs()) { - for (String feed : getDataSets()) { - if (Util.readEntityName(feed).equalsIgnoreCase(output.getFeed())) { - return feed; - } - } - } - return null; + return getFeed(processObject.getOutputs().getOutputs().get(0).getFeed()); } public String getOutputFeedNameFromBundle() { http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java index 0b2c4e1..a825845 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/BundleUtil.java @@ -237,7 +237,7 @@ public final class BundleUtil { return property; } - public static List<ClusterMerlin> fromString(List<String> clusterStrings) { + public static List<ClusterMerlin> getClustersFromStrings(List<String> clusterStrings) { List<ClusterMerlin> clusters = new ArrayList<ClusterMerlin>(); for (String clusterString : clusterStrings) { clusters.add(new ClusterMerlin(clusterString)); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java index 4620787..6c90256 100644 --- a/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java +++ b/falcon-regression/merlin-core/src/main/java/org/apache/falcon/regression/core/util/InstanceUtil.java @@ -26,10 +26,6 @@ import com.google.gson.JsonSyntaxException; import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.entity.v0.Frequency; -import org.apache.falcon.entity.v0.process.Input; -import org.apache.falcon.regression.Entities.FeedMerlin; -import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.enumsAndConstants.ResponseErrors; import org.apache.falcon.regression.core.helpers.ColoHelper; @@ -527,37 +523,6 @@ public final class InstanceUtil { return InstanceUtil.sendRequestProcessInstance(url, user); } - /** - * Retrieves prefix (main sub-folders) of feed data path. - */ - public static String getFeedPrefix(String feed) { - FeedMerlin feedElement = new FeedMerlin(feed); - String locationPath = feedElement.getLocations().getLocations().get(0).getPath(); - locationPath = locationPath.substring(0, locationPath.indexOf('$')); - return locationPath; - } - - /** - * Adds one input into process. - * - * @param process - where input should be inserted - * @param feed - feed which will be used as input feed - * @return - string representation of process definition - */ - public static String addProcessInputFeed(String process, String feed, String feedName) { - - ProcessMerlin processElement = new ProcessMerlin(process); - Input in1 = processElement.getInputs().getInputs().get(0); - Input in2 = new Input(); - in2.setEnd(in1.getEnd()); - in2.setFeed(feed); - in2.setName(feedName); - in2.setPartition(in1.getPartition()); - in2.setStart(in1.getStart()); - processElement.getInputs().getInputs().add(in2); - return processElement.toString(); - } - public static org.apache.oozie.client.WorkflowJob.Status getInstanceStatusFromCoord( ColoHelper coloHelper, String coordID, int instanceNumber) throws OozieClientException { OozieClient oozieClient = coloHelper.getProcessHelper().getOozieClient(); @@ -598,21 +563,6 @@ public final class InstanceUtil { return actionInfo.getRun(); } - - /** - * Sets new feed data path. - * - * @param feed feed which is to be modified - * @param path new feed data path - * @return modified feed - */ - public static String setFeedFilePath(String feed, String path) { - - FeedMerlin feedElement = new FeedMerlin(feed); - feedElement.getLocations().getLocations().get(0).setPath(path); - return feedElement.toString(); - } - public static int checkIfFeedCoordExist(AbstractEntityHelper helper, String feedName, String coordType) throws OozieClientException { LOGGER.info("feedName: " + feedName); @@ -640,47 +590,6 @@ public final class InstanceUtil { return numberOfCoord; } - /** - * Sets process frequency. - * - * @return modified process definition - */ - public static String setProcessFrequency(String process, Frequency frequency) { - ProcessMerlin p = new ProcessMerlin(process); - p.setFrequency(frequency); - return p.toString(); - } - - /** - * Sets new process name. - */ - public static String setProcessName(String process, String newName) { - ProcessMerlin p = new ProcessMerlin(process); - p.setName(newName); - return p.toString(); - } - - /** - * Sets new process validity on all the process clusters. - * - * @param process process entity to be modified - * @param startTime start of process validity - * @param endTime end of process validity - * @return modified process definition - */ - public static String setProcessValidity(String process, - String startTime, String endTime) { - ProcessMerlin processElement = new ProcessMerlin(process); - - for (int i = 0; i < processElement.getClusters().getClusters().size(); i++) { - processElement.getClusters().getClusters().get(i).getValidity().setStart( - TimeUtil.oozieDateToDate(startTime).toDate()); - processElement.getClusters().getClusters().get(i).getValidity() - .setEnd(TimeUtil.oozieDateToDate(endTime).toDate()); - } - return processElement.toString(); - } - public static List<CoordinatorAction> getProcessInstanceListFromAllBundles( ColoHelper coloHelper, String processName, EntityType entityType) throws OozieClientException { @@ -950,17 +859,6 @@ public final class InstanceUtil { } /** - * Sets feed frequency. - * - * @return modified feed - */ - public static String setFeedFrequency(String feed, Frequency f) { - FeedMerlin feedElement = new FeedMerlin(feed); - feedElement.setFrequency(f); - return feedElement.toString(); - } - - /** * Waits till instances of specific job will be created during specific time. * Use this method directly in unusual test cases where timeouts are different from trivial. * In other cases use waitTillInstancesAreCreated(ColoHelper,String,int) http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java index eaa69f0..02280f3 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/AuthorizationTest.java @@ -287,36 +287,32 @@ public class AuthorizationTest extends BaseTestClass { bundles[0].submitFeedsScheduleProcess(prism); //check that there are 3 running instances - InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0] - .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS); + InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3, + CoordinatorAction.Status.RUNNING, EntityType.PROCESS); //check that there are 2 waiting instances - InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0] - .getProcessData()), 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS); + InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2, + CoordinatorAction.Status.WAITING, EntityType.PROCESS); //3 instances should be running , other 2 should be waiting - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util - .readEntityName(bundles[0].getProcessData()), + InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(), "?start=" + startTime + "&end=" + endTime); InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0); //suspend 3 running instances - r = prism.getProcessHelper().getProcessInstanceSuspend(Util - .readEntityName(bundles[0].getProcessData()), + r = prism.getProcessHelper().getProcessInstanceSuspend(bundles[0].getProcessName(), "?start=" + startTime + "&end=" + midTime); InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0); //try to resume suspended instances by U2 - r = prism.getProcessHelper().getProcessInstanceResume(Util.readEntityName(bundles[0] - .getProcessData()), "?start=" + startTime + "&end=" + midTime, - MerlinConstants.USER2_NAME); + r = prism.getProcessHelper().getProcessInstanceResume(bundles[0].getProcessName(), "?start=" + startTime + + "&end=" + midTime, MerlinConstants.USER2_NAME); //the state of above 3 instances should still be suspended InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0); //check the status of all instances - r = prism.getProcessHelper().getProcessInstanceStatus(Util - .readEntityName(bundles[0].getProcessData()), + r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(), "?start=" + startTime + "&end=" + endTime); InstanceUtil.validateResponse(r, 5, 0, 3, 2, 0); } @@ -353,18 +349,16 @@ public class AuthorizationTest extends BaseTestClass { bundles[0].submitFeedsScheduleProcess(prism); //check that there are 3 running instances - InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0] - .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS); + InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3, + CoordinatorAction.Status.RUNNING, EntityType.PROCESS); //3 instances should be running , other 2 should be waiting - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util - .readEntityName(bundles[0].getProcessData()), + InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(), "?start=" + startTime + "&end=" + endTime); InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0); //try to kill all instances by U2 - r = prism.getProcessHelper().getProcessInstanceKill(Util - .readEntityName(bundles[0].getProcessData()), + r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(), "?start=" + startTime + "&end=" + endTime, MerlinConstants.USER2_NAME); //number of instances should be the same as before @@ -401,28 +395,25 @@ public class AuthorizationTest extends BaseTestClass { bundles[0].submitFeedsScheduleProcess(prism); //check that there are 3 running instances - InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0] - .getProcessData()), 3, CoordinatorAction.Status.RUNNING, EntityType.PROCESS); + InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 3, + CoordinatorAction.Status.RUNNING, EntityType.PROCESS); //check that there are 2 waiting instances - InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0] - .getProcessData()), 2, CoordinatorAction.Status.WAITING, EntityType.PROCESS); + InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 2, + CoordinatorAction.Status.WAITING, EntityType.PROCESS); //3 instances should be running , other 2 should be waiting - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util - .readEntityName(bundles[0].getProcessData()), + InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(), "?start=" + startTime + "&end=" + endTime); InstanceUtil.validateResponse(r, 5, 3, 0, 2, 0); //suspend 3 running instances - r = prism.getProcessHelper().getProcessInstanceSuspend(Util - .readEntityName(bundles[0].getProcessData()), + r = prism.getProcessHelper().getProcessInstanceSuspend(bundles[0].getProcessName(), "?start=" + startTime + "&end=" + midTime); InstanceUtil.validateResponse(r, 3, 0, 3, 0, 0); //try to kill all instances by U2 - r = prism.getProcessHelper().getProcessInstanceKill(Util - .readEntityName(bundles[0].getProcessData()), + r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(), "?start=" + startTime + "&end=" + endTime, MerlinConstants.USER2_NAME); //3 should still be suspended, 2 should be waiting @@ -465,29 +456,24 @@ public class AuthorizationTest extends BaseTestClass { bundles[0].submitFeedsScheduleProcess(prism); //check that there are 4 running instances - InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(bundles[0] - .getProcessData()), 4, CoordinatorAction.Status.RUNNING, EntityType.PROCESS); + InstanceUtil.waitTillInstanceReachState(clusterOC, bundles[0].getProcessName(), 4, + CoordinatorAction.Status.RUNNING, EntityType.PROCESS); //4 instances should be running , 1 should be waiting - InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(Util - .readEntityName(bundles[0].getProcessData()), + InstancesResult r = prism.getProcessHelper().getProcessInstanceStatus(bundles[0].getProcessName(), "?start=" + startTime + "&end=" + endTime); InstanceUtil.validateResponse(r, 5, 4, 0, 1, 0); //kill 3 running instances - r = prism.getProcessHelper().getProcessInstanceKill(Util - .readEntityName(bundles[0].getProcessData()), "?start=" + startTime + "&end=" - + - midTime); + r = prism.getProcessHelper().getProcessInstanceKill(bundles[0].getProcessName(), + "?start=" + startTime + "&end=" + midTime); InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3); //generally 3 instances should be killed, 1 is running and 1 is waiting //try to rerun instances by U2 - r = prism.getProcessHelper().getProcessInstanceRerun(Util - .readEntityName(bundles[0].getProcessData()), "?start=" + startTime + "&end=" - + - midTime, MerlinConstants.USER2_NAME); + r = prism.getProcessHelper().getProcessInstanceRerun(bundles[0].getProcessName(), + "?start=" + startTime + "&end=" + midTime, MerlinConstants.USER2_NAME); //instances should still be killed InstanceUtil.validateResponse(r, 3, 0, 0, 0, 3); @@ -510,7 +496,7 @@ public class AuthorizationTest extends BaseTestClass { Assert.assertTrue(definition.contains(feed.getName()) && !definition.contains("(feed) not found"), "Feed should be already submitted"); //update feed definition - FeedMerlin newFeed = new FeedMerlin(feed.toString()); + FeedMerlin newFeed = new FeedMerlin(feed); newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN); //try to update feed by U2 final ServiceResponse serviceResponse = prism.getFeedHelper().update(feed.toString(), newFeed.toString(), @@ -589,8 +575,7 @@ public class AuthorizationTest extends BaseTestClass { @Test(enabled = false) public void u1ScheduleFeedU2ScheduleDependantProcessU1UpdateFeed() throws Exception { FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle()); - String process = bundles[0].getProcessData(); - process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z"); + bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z"); //submit both feeds bundles[0].submitClusters(prism); bundles[0].submitFeeds(prism); @@ -600,13 +585,13 @@ public class AuthorizationTest extends BaseTestClass { //by U2 schedule process dependant on scheduled feed by U1 ServiceResponse serviceResponse = prism.getProcessHelper() - .submitAndSchedule(process, MerlinConstants.USER2_NAME); + .submitAndSchedule(bundles[0].getProcessData(), MerlinConstants.USER2_NAME); AssertUtil.assertSucceeded(serviceResponse); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); + AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); //get old process details String oldProcessBundleId = InstanceUtil - .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS); + .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); String oldProcessUser = getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); @@ -615,7 +600,7 @@ public class AuthorizationTest extends BaseTestClass { String oldFeedUser = getBundleUser(cluster, feed.getName(), EntityType.FEED); //update feed definition - FeedMerlin newFeed = new FeedMerlin(feed.toString()); + FeedMerlin newFeed = new FeedMerlin(feed); newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN); //update feed by U1 @@ -630,7 +615,7 @@ public class AuthorizationTest extends BaseTestClass { Assert.assertEquals(oldFeedUser, newFeedUser, "User should be the same"); //new process bundle should be created by U2 - OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false); + OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); String newProcessUser = getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same"); @@ -641,8 +626,7 @@ public class AuthorizationTest extends BaseTestClass { @Test(enabled = false) public void u1ScheduleFeedU2ScheduleDependantProcessU2UpdateFeed() throws Exception { FeedMerlin feed = new FeedMerlin(bundles[0].getInputFeedFromBundle()); - String process = bundles[0].getProcessData(); - process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z"); + bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z"); //submit both feeds bundles[0].submitClusters(prism); bundles[0].submitFeeds(prism); @@ -651,18 +635,18 @@ public class AuthorizationTest extends BaseTestClass { AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed.toString(), Job.Status.RUNNING); //by U2 schedule process dependent on scheduled feed by U1 - ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(process, + ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData(), MerlinConstants.USER2_NAME); AssertUtil.assertSucceeded(serviceResponse); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); + AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); //update feed definition - FeedMerlin newFeed = new FeedMerlin(feed.toString()); + FeedMerlin newFeed = new FeedMerlin(feed); newFeed.setFeedPathValue(baseTestDir + "/randomPath" + MINUTE_DATE_PATTERN); //get old process details String oldProcessBundleId = InstanceUtil - .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS); + .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); String oldProcessUser = getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); @@ -682,7 +666,7 @@ public class AuthorizationTest extends BaseTestClass { Assert.assertEquals(MerlinConstants.USER2_NAME, newFeedUser); //new process bundle should be created by U2 - OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false); + OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); String newProcessUser = getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same"); @@ -693,8 +677,7 @@ public class AuthorizationTest extends BaseTestClass { @Test(enabled = false) public void u1ScheduleFeedU1ScheduleDependantProcessU1UpdateProcess() throws Exception { String feed = bundles[0].getInputFeedFromBundle(); - String process = bundles[0].getProcessData(); - process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z"); + bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z"); //submit both feeds bundles[0].submitClusters(prism); bundles[0].submitFeeds(prism); @@ -703,13 +686,13 @@ public class AuthorizationTest extends BaseTestClass { AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); //by U1 schedule process dependent on scheduled feed by U1 - ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(process); + ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData()); AssertUtil.assertSucceeded(serviceResponse); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); + AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); //get old process details String oldProcessBundleId = InstanceUtil - .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS); + .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); String oldProcessUser = getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); @@ -718,16 +701,16 @@ public class AuthorizationTest extends BaseTestClass { .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED); //update process by U1 - ProcessMerlin processObj = new ProcessMerlin(process); + ProcessMerlin processObj = bundles[0].getProcessObject(); processObj.setProperty("randomProp", "randomVal"); - serviceResponse = prism.getProcessHelper().update(process, processObj.toString()); + serviceResponse = prism.getProcessHelper().update(bundles[0].getProcessData(), processObj.toString()); AssertUtil.assertSucceeded(serviceResponse); //new feed bundle should not be created OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false); //new process bundle should be created by U1 - OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false); + OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); String newProcessUser = getBundleUser(cluster, processObj.getName(), EntityType.PROCESS); Assert.assertEquals(oldProcessUser, newProcessUser, "User should be the same"); @@ -738,8 +721,7 @@ public class AuthorizationTest extends BaseTestClass { @Test(enabled = false) public void u1ScheduleFeedU1ScheduleDependantProcessU2UpdateProcess() throws Exception { String feed = bundles[0].getInputFeedFromBundle(); - String process = bundles[0].getProcessData(); - process = InstanceUtil.setProcessValidity(process, "2010-01-02T01:00Z", "2099-01-02T01:00Z"); + bundles[0].setProcessValidity("2010-01-02T01:00Z", "2099-01-02T01:00Z"); //submit both feeds bundles[0].submitClusters(prism); bundles[0].submitFeeds(prism); @@ -748,13 +730,13 @@ public class AuthorizationTest extends BaseTestClass { AssertUtil.checkStatus(clusterOC, EntityType.FEED, feed, Job.Status.RUNNING); //by U1 schedule process dependent on scheduled feed by U1 - ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(process); + ServiceResponse serviceResponse = prism.getProcessHelper().submitAndSchedule(bundles[0].getProcessData()); AssertUtil.assertSucceeded(serviceResponse); - AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, process, Job.Status.RUNNING); + AssertUtil.checkStatus(clusterOC, EntityType.PROCESS, bundles[0].getProcessData(), Job.Status.RUNNING); //get old process details String oldProcessBundleId = InstanceUtil - .getLatestBundleID(cluster, Util.readEntityName(process), EntityType.PROCESS); + .getLatestBundleID(cluster, bundles[0].getProcessName(), EntityType.PROCESS); String oldProcessUser = getBundleUser(cluster, bundles[0].getProcessName(), EntityType.PROCESS); @@ -763,9 +745,9 @@ public class AuthorizationTest extends BaseTestClass { .getLatestBundleID(cluster, Util.readEntityName(feed), EntityType.FEED); //update process by U2 - ProcessMerlin processObj = new ProcessMerlin(process); + ProcessMerlin processObj = bundles[0].getProcessObject(); processObj.setProperty("randomProp", "randomVal"); - serviceResponse = prism.getProcessHelper().update(process, processObj.toString(), + serviceResponse = prism.getProcessHelper().update(bundles[0].getProcessData(), processObj.toString(), TimeUtil.getTimeWrtSystemTime(0), MerlinConstants.USER2_NAME); AssertUtil.assertSucceeded(serviceResponse); @@ -773,7 +755,7 @@ public class AuthorizationTest extends BaseTestClass { OozieUtil.verifyNewBundleCreation(cluster, oldFeedBundleId, null, feed, false, false); //new process bundle should be created by U2 - OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, process, true, false); + OozieUtil.verifyNewBundleCreation(cluster, oldProcessBundleId, null, bundles[0].getProcessData(), true, false); String newProcessUser = getBundleUser(cluster, processObj.getName(), EntityType.PROCESS); Assert.assertNotEquals(oldProcessUser, newProcessUser, "User should not be the same"); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java index b7eb77f..33b0e77 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELExpCurrentAndLastWeekTest.java @@ -18,7 +18,6 @@ package org.apache.falcon.regression; -import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.Frequency.TimeUnit; @@ -149,7 +148,7 @@ public class ELExpCurrentAndLastWeekTest extends BaseTestClass { private List<String> getMissingDependencies(ColoHelper prismHelper, Bundle bundle) throws OozieClientException { List<String> bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(), - new ProcessMerlin(bundle.getProcessData()).getName(), EntityType.PROCESS); + bundle.getProcessName(), EntityType.PROCESS); String coordID = bundles.get(0); List<String> missingDependencies = OozieUtil.getMissingDependencies(prismHelper, coordID); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java index 41e3002..07292e1 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ELValidationsTest.java @@ -19,7 +19,6 @@ package org.apache.falcon.regression; import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.regression.Entities.ProcessMerlin; import org.apache.falcon.regression.core.bundle.Bundle; import org.apache.falcon.regression.core.helpers.ColoHelper; import org.apache.falcon.regression.core.util.BundleUtil; @@ -157,7 +156,7 @@ public class ELValidationsTest extends BaseTestClass { List<String> bundles = null; for (int i = 0; i < 10; ++i) { bundles = OozieUtil.getBundles(prismHelper.getFeedHelper().getOozieClient(), - new ProcessMerlin(bundle.getProcessData()).getName(), EntityType.PROCESS); + bundle.getProcessName(), EntityType.PROCESS); if (bundles.size() > 0) { break; } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java index 6b227d6..8eff8e4 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/ExternalFSTest.java @@ -143,30 +143,30 @@ public class ExternalFSTest extends BaseTestClass{ new String[]{"${YEAR}", "${MONTH}", "${DAY}", "${HOUR}", "${MINUTE}"}, separator); //configure feed - String feed = bundles[0].getDataSets().get(0); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); String targetDataLocation = endpoint + testWasbTargetDir + datePattern; - feed = InstanceUtil.setFeedFilePath(feed, sourcePath + '/' + datePattern); + feed.setFilePath(sourcePath + '/' + datePattern); //erase all clusters from feed definition - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + feed.clearFeedClusters(); //set local cluster as source - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTime, endTime) .withClusterType(ClusterType.SOURCE) - .build()).toString(); + .build()); //set externalFS cluster as target - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(externalBundle.getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTime, endTime) .withClusterType(ClusterType.TARGET) .withDataLocation(targetDataLocation) - .build()).toString(); + .build()); //submit and schedule feed - LOGGER.info("Feed : " + Util.prettyPrintXml(feed)); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); + LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); datePattern = StringUtils.join(new String[]{"yyyy", "MM", "dd", "HH", "mm"}, separator); //upload necessary data DateTime date = new DateTime(startTime, DateTimeZone.UTC); @@ -181,15 +181,15 @@ public class ExternalFSTest extends BaseTestClass{ Path dstPath = new Path(endpoint + testWasbTargetDir + '/' + timePattern); //check if coordinator exists - InstanceUtil.waitTillInstancesAreCreated(cluster, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster, feed.toString(), 0); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster.getFeedHelper(), Util.readEntityName(feed), + .checkIfFeedCoordExist(cluster.getFeedHelper(), Util.readEntityName(feed.toString()), "REPLICATION"), 1); TimeUtil.sleepSeconds(10); //replication should start, wait while it ends - InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed), 1, + InstanceUtil.waitTillInstanceReachState(clusterOC, Util.readEntityName(feed.toString()), 1, CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //check if data has been replicated correctly http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java index a6639ed..6dbe60e 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedInstanceListingTest.java @@ -77,7 +77,7 @@ public class FeedInstanceListingTest extends BaseTestClass{ bundles[0].setOutputFeedPeriodicity(5, Frequency.TimeUnit.minutes); bundles[0].setOutputFeedLocationData(feedOutputPath); bundles[0].setProcessPeriodicity(5, Frequency.TimeUnit.minutes); - processName = Util.readEntityName(bundles[0].getProcessData()); + processName = bundles[0].getProcessName(); HadoopUtil.uploadDir(clusterFS, aggregateWorkflowDir, OSUtil.RESOURCES_OOZIE); } http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java index 2c8346d..61ef8f3 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedLateRerunTest.java @@ -87,32 +87,32 @@ public class FeedLateRerunTest extends BaseTestClass { LOGGER.info("Time range between : " + startTime + " and " + endTime); //configure feed - String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.setFilePath(feedDataLocation); //erase all clusters from feed definition - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + feed.clearFeedClusters(); //set cluster1 as source - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTime, endTime) .withClusterType(ClusterType.SOURCE) - .build()).toString(); + .build()); //set cluster2 as target - feed = FeedMerlin.fromString(feed).addFeedCluster( + feed.addFeedCluster( new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) .withRetention("days(1000000)", ActionType.DELETE) .withValidity(startTime, endTime) .withClusterType(ClusterType.TARGET) .withDataLocation(targetDataLocation) - .build()).toString(); - String entityName = Util.readEntityName(feed); + .build()); + String entityName = feed.getName(); //submit and schedule feed - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); //check if coordinator exists - InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0); Assert.assertEquals(InstanceUtil .checkIfFeedCoordExist(cluster2.getFeedHelper(), entityName, "REPLICATION"), 1); http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java index eb8c4fe..de4f692 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/FeedReplicationTest.java @@ -112,29 +112,29 @@ public class FeedReplicationTest extends BaseTestClass { LOGGER.info("Time range between : " + startTime + " and " + endTime); //configure feed - String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.setFilePath(feedDataLocation); //erase all clusters from feed definition - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + feed.clearFeedClusters(); //set cluster1 as source - feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()).toString(); + feed.addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.SOURCE) + .build()); //set cluster2 as target - feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()).toString(); + feed.addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()); //submit and schedule feed - LOGGER.info("Feed : " + Util.prettyPrintXml(feed)); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); + LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); //upload necessary data DateTime date = new DateTime(startTime, DateTimeZone.UTC); @@ -152,15 +152,15 @@ public class FeedReplicationTest extends BaseTestClass { } //check if coordinator exists - InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), - "REPLICATION"), 1); + .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed.toString()), + "REPLICATION"), 1); //replication should start, wait while it ends - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed.toString()), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //check if data has been replicated correctly List<Path> cluster1ReplicatedData = HadoopUtil @@ -191,37 +191,37 @@ public class FeedReplicationTest extends BaseTestClass { LOGGER.info("Time range between : " + startTime + " and " + endTime); //configure feed - String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.setFilePath(feedDataLocation); //erase all clusters from feed definition - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + feed.clearFeedClusters(); //set cluster1 as source - feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()).toString(); + feed.addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.SOURCE) + .build()); //set cluster2 as target - feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()).toString(); + feed.addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()); //set cluster3 as target - feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()).toString(); + feed.addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[2].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()); //submit and schedule feed - LOGGER.info("Feed : " + Util.prettyPrintXml(feed)); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); + LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); //upload necessary data DateTime date = new DateTime(startTime, DateTimeZone.UTC); @@ -240,23 +240,23 @@ public class FeedReplicationTest extends BaseTestClass { } //check if all coordinators exist - InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0); - InstanceUtil.waitTillInstancesAreCreated(cluster3, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster3, feed.toString(), 0); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster2.getFeedHelper(), Util.readEntityName(feed), - "REPLICATION"), 1); + .checkIfFeedCoordExist(cluster2.getFeedHelper(), feed.getName(), + "REPLICATION"), 1); Assert.assertEquals(InstanceUtil - .checkIfFeedCoordExist(cluster3.getFeedHelper(), Util.readEntityName(feed), - "REPLICATION"), 1); + .checkIfFeedCoordExist(cluster3.getFeedHelper(), feed.getName(), + "REPLICATION"), 1); //replication on cluster 2 should start, wait till it ends - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //replication on cluster 3 should start, wait till it ends - InstanceUtil.waitTillInstanceReachState(cluster3OC, Util.readEntityName(feed), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + InstanceUtil.waitTillInstanceReachState(cluster3OC, feed.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //check if data has been replicated correctly List<Path> cluster1ReplicatedData = HadoopUtil @@ -299,29 +299,29 @@ public class FeedReplicationTest extends BaseTestClass { FeedMerlin feedElement = bundles[0].getFeedElement(feedName); feedElement.setAvailabilityFlag(availabilityFlagName); bundles[0].writeFeedElement(feedElement, feedName); - String feed = bundles[0].getDataSets().get(0); - feed = InstanceUtil.setFeedFilePath(feed, feedDataLocation); + FeedMerlin feed = new FeedMerlin(bundles[0].getDataSets().get(0)); + feed.setFilePath(feedDataLocation); //erase all clusters from feed definition - feed = FeedMerlin.fromString(feed).clearFeedClusters().toString(); + feed.clearFeedClusters(); //set cluster1 as source - feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.SOURCE) - .build()).toString(); + feed.addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[0].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.SOURCE) + .build()); //set cluster2 as target - feed = FeedMerlin.fromString(feed).addFeedCluster( - new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) - .withRetention("days(1000000)", ActionType.DELETE) - .withValidity(startTime, endTime) - .withClusterType(ClusterType.TARGET) - .withDataLocation(targetDataLocation) - .build()).toString(); + feed.addFeedCluster( + new FeedMerlin.FeedClusterBuilder(Util.readEntityName(bundles[1].getClusters().get(0))) + .withRetention("days(1000000)", ActionType.DELETE) + .withValidity(startTime, endTime) + .withClusterType(ClusterType.TARGET) + .withDataLocation(targetDataLocation) + .build()); //submit and schedule feed - LOGGER.info("Feed : " + Util.prettyPrintXml(feed)); - AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed)); + LOGGER.info("Feed : " + Util.prettyPrintXml(feed.toString())); + AssertUtil.assertSucceeded(prism.getFeedHelper().submitAndSchedule(feed.toString())); //upload necessary data DateTime date = new DateTime(startTime, DateTimeZone.UTC); @@ -339,7 +339,7 @@ public class FeedReplicationTest extends BaseTestClass { } //check while instance is got created - InstanceUtil.waitTillInstancesAreCreated(cluster2, feed, 0); + InstanceUtil.waitTillInstancesAreCreated(cluster2, feed.toString(), 0); //check if coordinator exists Assert.assertEquals(InstanceUtil @@ -357,12 +357,12 @@ public class FeedReplicationTest extends BaseTestClass { OSUtil.OOZIE_EXAMPLE_INPUT_DATA + availabilityFlagName); //check if instance become running - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, - CoordinatorAction.Status.RUNNING, EntityType.FEED); + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, + CoordinatorAction.Status.RUNNING, EntityType.FEED); //wait till instance succeed - InstanceUtil.waitTillInstanceReachState(cluster2OC, Util.readEntityName(feed), 1, - CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); + InstanceUtil.waitTillInstanceReachState(cluster2OC, feed.getName(), 1, + CoordinatorAction.Status.SUCCEEDED, EntityType.FEED); //check if data was replicated correctly List<Path> cluster1ReplicatedData = HadoopUtil http://git-wip-us.apache.org/repos/asf/falcon/blob/395675fb/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java ---------------------------------------------------------------------- diff --git a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java index 4ce6026..47523d8 100644 --- a/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java +++ b/falcon-regression/merlin/src/test/java/org/apache/falcon/regression/LogMoverTest.java @@ -149,17 +149,17 @@ public class LogMoverTest extends BaseTestClass { private boolean validate(boolean logFlag) throws Exception { String stagingDir= MerlinConstants.STAGING_LOCATION; String path=stagingDir+"/falcon/workflows/process/"+processName+"/logs"; - List<Path> logmoverPath = HadoopUtil + List<Path> logmoverPaths = HadoopUtil .getAllFilesRecursivelyHDFS(clusterFS, new Path(HadoopUtil.cutProtocol(path))); if (logFlag) { - for(int index=0; index < logmoverPath.size(); index++) { - if (logmoverPath.get(index).toString().contains("SUCCEEDED")) { + for (Path logmoverPath : logmoverPaths) { + if (logmoverPath.toString().contains("SUCCEEDED")) { return true; } } } else { - for(int index=0; index < logmoverPath.size(); index++) { - if (logmoverPath.get(index).toString().contains("FAILED")) { + for (Path logmoverPath : logmoverPaths) { + if (logmoverPath.toString().contains("FAILED")) { return true; } }
