Repository: falcon Updated Branches: refs/heads/master ce59dc2ea -> 2132a983a
FALCON-1516 Feed Retention support in Falcon Unit (Pavan Kolamuri) Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2f867bc2 Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2f867bc2 Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2f867bc2 Branch: refs/heads/master Commit: 2f867bc2fdfa9f29f7959cafe3e9050adf9c0975 Parents: e3893a8 Author: Pallavi Rao <[email protected]> Authored: Mon Oct 12 11:44:26 2015 +0530 Committer: Pallavi Rao <[email protected]> Committed: Mon Oct 12 11:44:26 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 2 + .../falcon/client/AbstractFalconClient.java | 30 ++++++++++++ .../org/apache/falcon/client/FalconClient.java | 10 ++-- .../java/org/apache/falcon/util/DateUtil.java | 11 ++++- .../resource/AbstractInstanceManager.java | 6 +-- .../apache/falcon/unit/FalconUnitClient.java | 17 +++++++ .../falcon/unit/LocalInstanceManager.java | 43 +++++++++++++++++ .../unit/LocalSchedulableEntityManager.java | 27 +++++++++++ .../apache/falcon/unit/FalconUnitTestBase.java | 51 +++++++++++++++++++- .../org/apache/falcon/unit/TestFalconUnit.java | 31 ++++++++++++ unit/src/test/resources/infeed.xml | 2 +- 11 files changed, 217 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 8ac9fb8..35cc484 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -17,6 +17,8 @@ Trunk (Unreleased) FALCON-1027 Falcon proxy user support(Sowmya Ramesh) IMPROVEMENTS + FALCON-1516 Feed Retention support in Falcon Unit(Pavan Kolamuri via Pallavi Rao) + FALCON-1231 Improve JobCompletionNotification Service(Pallavi Rao) FALCON-1157 Build error when using maven 3.3.x(Venkat Ramachandran via Pallavi Rao) http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java index 265e08c..2358289 100644 --- a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -17,10 +17,13 @@ */ package org.apache.falcon.client; +import org.apache.falcon.LifeCycle; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.InstancesResult; import java.io.IOException; +import java.util.List; /** * Abstract Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs @@ -51,4 +54,31 @@ public abstract class AbstractFalconClient { public abstract APIResult schedule(EntityType entityType, String entityName, String colo, Boolean skipDryRun, String doAsuser, String properties) throws FalconCLIException; + + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + + /** + * + * @param type entity type + * @param entity entity name + * @param start start time + * @param end end time + * @param colo colo name + * @param lifeCycles lifecycle of an entity (for ex : feed has replication,eviction). + * @param filterBy filter operation can be applied to results + * @param orderBy + * @param sortOrder sort order can be asc or desc + * @param offset offset while displaying results + * @param numResults num of Results to output + * @param doAsUser + * @return + * @throws FalconCLIException + */ + public abstract InstancesResult getStatusOfInstances(String type, String entity, + String start, String end, + String colo, List<LifeCycle> lifeCycles, String filterBy, + String orderBy, String sortOrder, + Integer offset, Integer numResults, + String doAsUser) throws FalconCLIException; + //RESUME CHECKSTYLE CHECK ParameterNumberCheck } http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 6c3a7a4..27510f6 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -576,12 +576,10 @@ public class FalconClient extends AbstractFalconClient { } public InstancesResult getLogsOfInstances(String type, String entity, String start, - String end, String colo, String runId, - List<LifeCycle> lifeCycles, String filterBy, - String orderBy, String sortOrder, Integer offset, - Integer numResults, String doAsUser) - throws FalconCLIException { - + String end, String colo, String runId, + List<LifeCycle> lifeCycles, String filterBy, + String orderBy, String sortOrder, Integer offset, + Integer numResults, String doAsUser) throws FalconCLIException { return sendInstanceRequest(Instances.LOG, type, entity, start, end, null, runId, colo, lifeCycles, filterBy, orderBy, sortOrder, offset, numResults, doAsUser) .getEntity(InstancesResult.class); http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/common/src/main/java/org/apache/falcon/util/DateUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java index f89ef64..82163cc 100644 --- a/common/src/main/java/org/apache/falcon/util/DateUtil.java +++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java @@ -15,9 +15,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.falcon.util; +import org.apache.falcon.entity.v0.SchemaHelper; + import java.util.Calendar; import java.util.Date; import java.util.TimeZone; @@ -30,14 +31,20 @@ public final class DateUtil { //Friday, April 16, 9999 7:12:55 AM UTC corresponding date public static final Date NEVER = new Date(Long.parseLong("253379862775000")); + public static final long HOUR_IN_MILLIS = 60 * 60 * 1000; + private DateUtil() {} public static Date getNextMinute(Date time) throws Exception { Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); insCal.setTime(time); - insCal.add(Calendar.MINUTE, 1); return insCal.getTime(); + } + public static String getDateFormatFromTime(long milliSeconds) { + return SchemaHelper.getDateFormat().format((new Date(milliSeconds))); + + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index c1b4da6..606f741 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -842,8 +842,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { return getStartAndEndDate(entityObject, startStr, endStr, getDefaultResultsPerPage()); } - private Pair<Date, Date> getStartAndEndDate(Entity entityObject, String startStr, String endStr, Integer numResults) - throws FalconException { + protected Pair<Date, Date> getStartAndEndDate(Entity entityObject, String startStr, String endStr, + Integer numResults) throws FalconException { Pair<Date, Date> clusterStartEndDates = EntityUtil.getEntityStartEndDates(entityObject); Frequency frequency = EntityUtil.getFrequency(entityObject); Date endDate = getEndDate(endStr, clusterStartEndDates.second); @@ -909,7 +909,7 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { return start; } - private void validateParams(String type, String entity) throws FalconException { + protected void validateParams(String type, String entity) throws FalconException { validateNotEmpty("entityType", type); validateNotEmpty("entityName", entity); } http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java index 169614b..3ce261e 100644 --- a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -57,10 +57,15 @@ public class FalconUnitClient extends AbstractFalconClient { protected ConfigurationStore configStore; private AbstractWorkflowEngine workflowEngine; + private LocalSchedulableEntityManager localSchedulableEntityManager; + private LocalInstanceManager localInstanceManager; + public FalconUnitClient() throws FalconException { configStore = ConfigurationStore.get(); workflowEngine = WorkflowEngineFactory.getWorkflowEngine(); + localSchedulableEntityManager = new LocalSchedulableEntityManager(); + localInstanceManager = new LocalInstanceManager(); } public ConfigurationStore getConfigStore() { @@ -123,6 +128,18 @@ public class FalconUnitClient extends AbstractFalconClient { return schedule(entityType, entityName, null, 0, cluster, skipDryRun, properties); } + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + @Override + public InstancesResult getStatusOfInstances(String type, String entity, String start, String end, + String colo, List<LifeCycle> lifeCycles, String filterBy, + String orderBy, String sortOrder, Integer offset, + Integer numResults, String doAsUser) throws FalconCLIException { + return localInstanceManager.getStatusOfInstances(type, entity, start, end, colo, lifeCycles, filterBy, orderBy, + sortOrder, offset, numResults); + + } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck + /** * Schedules an submitted process entity immediately. http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java new file mode 100644 index 0000000..1503b28 --- /dev/null +++ b/unit/src/main/java/org/apache/falcon/unit/LocalInstanceManager.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit; + +import org.apache.falcon.LifeCycle; +import org.apache.falcon.resource.AbstractInstanceManager; +import org.apache.falcon.resource.InstancesResult; + +import java.util.List; + +/** + * A proxy implementation of the entity instance operations. + */ +public class LocalInstanceManager extends AbstractInstanceManager { + + public LocalInstanceManager() {} + + //SUSPEND CHECKSTYLE CHECK ParameterNumberCheck + public InstancesResult getStatusOfInstances(String type, String entity, String start, String end, + String colo, List<LifeCycle> lifeCycles, String filterBy, + String orderBy, String sortOrder, Integer offset, + Integer numResults) { + return super.getStatus(type, entity, start, end, colo, lifeCycles, filterBy, orderBy, sortOrder, + offset, numResults); + } + //RESUME CHECKSTYLE CHECK ParameterNumberCheck + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java new file mode 100644 index 0000000..d793cf2 --- /dev/null +++ b/unit/src/main/java/org/apache/falcon/unit/LocalSchedulableEntityManager.java @@ -0,0 +1,27 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit; + +import org.apache.falcon.resource.AbstractSchedulableEntityManager; + +/** + * A proxy implementation of the schedulable entity operations in local mode. + */ +public class LocalSchedulableEntityManager extends AbstractSchedulableEntityManager { + // Created for future purposes to add all entity API's here for falcon unit. +} http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java index 995af2b..45b88f0 100644 --- a/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java +++ b/unit/src/test/java/org/apache/falcon/unit/FalconUnitTestBase.java @@ -20,12 +20,15 @@ package org.apache.falcon.unit; import org.apache.commons.io.FileUtils; import org.apache.commons.lang.StringUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; import org.apache.falcon.client.FalconCLIException; +import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.FeedHelper; import org.apache.falcon.entity.Storage; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.feed.LocationType; import org.apache.falcon.entity.v0.process.Process; @@ -34,6 +37,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.hadoop.JailedFileSystem; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.util.DateUtil; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +55,10 @@ import java.io.BufferedReader; import java.io.FileOutputStream; import java.io.FileReader; import java.text.ParseException; +import java.text.SimpleDateFormat; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Properties; import java.util.TimeZone; @@ -74,6 +81,14 @@ public class FalconUnitTestBase { boolean evaluate() throws Exception; } + public static final ThreadLocal<SimpleDateFormat> FORMATTER = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm'Z'"); + return format; + } + }; + private static final Logger LOG = LoggerFactory.getLogger(FalconUnitTestBase.class); private static final String DEFAULT_CLUSTER = "local"; @@ -85,6 +100,7 @@ public class FalconUnitTestBase { private static final String WORKING_PATH = "/projects/falcon/working"; public static final Pattern VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##"); + protected static final int WAIT_TIME = 90000; protected static FalconUnitClient falconUnitClient; protected static JailedFileSystem fs; protected static ConfigurationStore configStore; @@ -170,6 +186,15 @@ public class FalconUnitTestBase { skipDryRun, properties); } + public APIResult schedule(EntityType entityType, String entityName, String cluster) throws FalconException, + FalconCLIException { + Entity entity = configStore.get(entityType, entityName); + if (entity == null) { + throw new FalconException("Process not found " + entityName); + } + return falconUnitClient.schedule(entityType, entityName, cluster, false, null, null); + } + private Map<String, String> updateColoAndCluster(String colo, String cluster, Map<String, String> props) { if (props == null) { props = new HashMap<>(); @@ -304,7 +329,7 @@ public class FalconUnitTestBase { } protected long waitForStatus(final EntityType entityType, final String entityName, final String instanceTime) { - return waitFor(90000, new Predicate() { + return waitFor(WAIT_TIME, new Predicate() { public boolean evaluate() throws Exception { InstancesResult.WorkflowStatus status = falconUnitClient.getInstanceStatus(entityType, entityName, instanceTime); @@ -317,4 +342,28 @@ public class FalconUnitTestBase { Assert.assertEquals(APIResult.Status.SUCCEEDED, apiResult.getStatus()); } + public InstancesResult.WorkflowStatus getRetentionStatus(String feedName, String cluster) throws FalconException, + FalconCLIException { + Feed feedEntity = EntityUtil.getEntity(EntityType.FEED, feedName); + Frequency.TimeUnit timeUnit = feedEntity.getFrequency().getTimeUnit(); + long endTimeInMillis = System.currentTimeMillis() + 30000; + String endTime = DateUtil.getDateFormatFromTime(endTimeInMillis); + long startTimeInMillis; + if (timeUnit == Frequency.TimeUnit.hours || timeUnit == Frequency.TimeUnit.minutes) { + startTimeInMillis = endTimeInMillis - (6 * DateUtil.HOUR_IN_MILLIS); + } else { + startTimeInMillis = endTimeInMillis - (24 * DateUtil.HOUR_IN_MILLIS); + } + String startTime = DateUtil.getDateFormatFromTime(startTimeInMillis); + List<LifeCycle> lifecycles = new ArrayList<>(); + lifecycles.add(LifeCycle.EVICTION); + InstancesResult result = falconUnitClient.getStatusOfInstances("feed", + feedName, startTime, endTime, cluster, + lifecycles, null, "status", "asc", 0, 1, null); + if (result.getInstances() != null && result.getInstances().length > 0) { + return result.getInstances()[0].getStatus(); + } + return null; + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java index fa9c664..d2e574b 100644 --- a/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java +++ b/unit/src/test/java/org/apache/falcon/unit/TestFalconUnit.java @@ -17,6 +17,8 @@ */ package org.apache.falcon.unit; +import org.apache.falcon.FalconException; +import org.apache.falcon.client.FalconCLIException; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.resource.APIResult; import org.apache.falcon.resource.InstancesResult; @@ -25,6 +27,9 @@ import org.apache.hadoop.fs.Path; import org.testng.Assert; import org.testng.annotations.Test; +import java.io.IOException; +import java.text.ParseException; + /** * Test cases of falcon jobs using Local Oozie and LocalJobRunner. */ @@ -56,4 +61,30 @@ public class TestFalconUnit extends FalconUnitTestBase { FileStatus[] files = getFileSystem().listStatus(new Path(outPath)); Assert.assertTrue(files.length > 0); } + + + @Test + public void testRetention() throws IOException, FalconCLIException, FalconException, + ParseException, InterruptedException { + // submit with default props + submitCluster(); + // submitting feeds + APIResult result = submit(EntityType.FEED, getAbsolutePath("/infeed.xml")); + assertStatus(result); + String scheduleTime = "2015-06-20T00:00Z"; + createData("in", "local", scheduleTime, "input.txt"); + String inPath = getFeedPathForTS("local", "in", scheduleTime); + Assert.assertTrue(fs.exists(new Path(inPath))); + result = schedule(EntityType.FEED, "in", "local"); + Assert.assertEquals(APIResult.Status.SUCCEEDED, result.getStatus()); + waitFor(WAIT_TIME, new Predicate() { + public boolean evaluate() throws Exception { + InstancesResult.WorkflowStatus status = getRetentionStatus("in", "local"); + return InstancesResult.WorkflowStatus.SUCCEEDED.equals(status); + } + }); + InstancesResult.WorkflowStatus status = getRetentionStatus("in", "local"); + Assert.assertEquals(InstancesResult.WorkflowStatus.SUCCEEDED, status); + Assert.assertFalse(fs.exists(new Path(inPath))); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/2f867bc2/unit/src/test/resources/infeed.xml ---------------------------------------------------------------------- diff --git a/unit/src/test/resources/infeed.xml b/unit/src/test/resources/infeed.xml index 509d868..62f1ba7 100644 --- a/unit/src/test/resources/infeed.xml +++ b/unit/src/test/resources/infeed.xml @@ -26,7 +26,7 @@ <clusters> <cluster name="local"> <validity start="2013-01-01T00:00Z" end="2030-01-01T00:00Z"/> - <retention limit="hours(400000)" action="delete"/> + <retention limit="hours(200)" action="delete"/> </cluster> </clusters>
