FALCON-1297 Falcon Unit which supports Submit and Schedule of jobs. Contributed by Pavan Kumar Kolamuri.
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/8cdac2bb Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/8cdac2bb Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/8cdac2bb Branch: refs/heads/0.7 Commit: 8cdac2bb151f9e727a4a7e13d21d0bc83debc120 Parents: 22c637f Author: Ajay Yadava <[email protected]> Authored: Tue Aug 4 17:13:47 2015 +0530 Committer: Ajay Yadav <[email protected]> Committed: Sat Aug 8 20:06:40 2015 +0530 ---------------------------------------------------------------------- CHANGES.txt | 8 +- .../falcon/client/AbstractFalconClient.java | 53 +++ .../org/apache/falcon/client/FalconClient.java | 2 +- .../entity/parser/ClusterEntityParser.java | 13 +- .../falcon/entity/store/ConfigurationStore.java | 54 ++- .../java/org/apache/falcon/util/DateUtil.java | 39 ++ .../falcon/workflow/util/OozieConstants.java | 33 ++ .../apache/falcon/hadoop/JailedFileSystem.java | 2 +- .../apache/falcon/oozie/OozieEntityBuilder.java | 8 +- .../workflow/engine/OozieClientFactory.java | 20 +- .../workflow/engine/OozieWorkflowEngine.java | 21 +- .../oozie/client/LocalOozieClientBundle.java | 382 +++++++++++++++++++ .../oozie/client/LocalProxyOozieClient.java | 188 +++++++++ pom.xml | 1 + unit/pom.xml | 106 +++++ .../java/org/apache/falcon/unit/FalconUnit.java | 215 +++++++++++ .../apache/falcon/unit/FalconUnitClient.java | 250 ++++++++++++ .../apache/falcon/unit/FalconUnitHelper.java | 100 +++++ .../unit/LocalFalconClientProtocolProvider.java | 62 +++ ...op.mapreduce.protocol.ClientProtocolProvider | 18 + unit/src/main/resources/core-site.xml | 38 ++ unit/src/main/resources/deploy.properties | 21 + .../main/resources/localoozie-log4j.properties | 34 ++ unit/src/main/resources/log4j.xml | 91 +++++ unit/src/main/resources/mapred-site.xml | 35 ++ unit/src/main/resources/oozie-site.xml | 170 +++++++++ unit/src/main/resources/startup.properties | 129 +++++++ .../apache/falcon/unit/FalconUnitTestBase.java | 317 +++++++++++++++ .../org/apache/falcon/unit/TestFalconUnit.java | 58 +++ .../falcon/unit/examples/JavaExample.java | 65 ++++ unit/src/test/resources/cluster-template.xml | 36 ++ unit/src/test/resources/infeed.xml | 39 ++ unit/src/test/resources/input.txt | 18 + unit/src/test/resources/outfeed.xml | 39 ++ unit/src/test/resources/process.xml | 50 +++ unit/src/test/resources/workflow.xml | 43 +++ .../falcon/resource/EntityManagerJerseyIT.java | 6 +- .../resource/ProcessInstanceManagerIT.java | 3 +- .../org/apache/falcon/util/OozieTestUtils.java | 17 +- 39 files changed, 2722 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/CHANGES.txt ---------------------------------------------------------------------- diff --git a/CHANGES.txt b/CHANGES.txt index 50ce4d2..e1eae4f 100755 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -4,11 +4,15 @@ Trunk (Unreleased) INCOMPATIBLE CHANGES NEW FEATURES + FALCON-1297 Falcon Unit which supports Submit and Schedule of jobs(Pavan Kumar Kolamuri via Ajay Yadava) + FALCON-1039 Add instance dependency API in falcon (Ajay Yadava) FALCON-796 Enable users to triage data processing issues through falcon (Ajay Yadava) IMPROVEMENTS + FALCON-1322 Add prefix in runtime.properties(Sandeep Samudrala via Ajay Yadava) + FALCON-1317 Inconsistent JSON serialization(Ajay Yadava) FALCON-1324 Pagination API breaks backward compatibility(Ajay Yadava). @@ -150,9 +154,7 @@ Release Version: 0.6.1 FALCON-822 Add reverse look up API (Ajay Yadava via Suhas Vasu) IMPROVEMENTS - FALCON-1322 Add prefix in runtime.properties(Sandeep Samudrala via Ajay Yadava) - - FALCON-1280 Update docs/license licenses with right copyright + FALCON-1280 Update docs/license licenses with right copyright information (Shaik Idris Ali) FALCON-1276 Verify licensing in html5-ui module. http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java new file mode 100644 index 0000000..bb6d8c9 --- /dev/null +++ b/client/src/main/java/org/apache/falcon/client/AbstractFalconClient.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.client; + +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.resource.APIResult; + +import java.io.IOException; + +/** + * Abstract Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs + * against an Falcon instance. + */ +public abstract class AbstractFalconClient { + + /** + * Submit a new entity. Entities can be of type feed, process or data end + * points. Entity definitions are validated structurally against schema and + * subsequently for other rules before they are admitted into the system. + * @param entityType + * @param filePath + * @return + * @throws FalconCLIException + */ + public abstract APIResult submit(String entityType, String filePath) throws FalconCLIException, + IOException; + + /** + * Schedules an submitted process entity immediately. + * @param entityType + * @param entityName + * @param colo + * @return + * @throws FalconCLIException + */ + public abstract APIResult schedule(EntityType entityType, String entityName, String colo) throws FalconCLIException; + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index d507371..9649e10 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -74,7 +74,7 @@ import java.util.Properties; * Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs * against an Falcon instance. */ -public class FalconClient { +public class FalconClient extends AbstractFalconClient { public static final String WS_HEADER_PREFIX = "header:"; public static final String USER = System.getProperty("user.name"); http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java index 59b0910..5756f84 100644 --- a/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java +++ b/common/src/main/java/org/apache/falcon/entity/parser/ClusterEntityParser.java @@ -34,6 +34,7 @@ import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowEngineFactory; +import org.apache.falcon.workflow.util.OozieConstants; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -46,6 +47,7 @@ import org.slf4j.LoggerFactory; import javax.jms.ConnectionFactory; import java.io.IOException; +import java.net.URI; /** * Parser that parses cluster entity definition. @@ -92,7 +94,12 @@ public class ClusterEntityParser extends EntityParser<Cluster> { private void validateScheme(Cluster cluster, Interfacetype interfacetype) throws ValidationException { final String endpoint = ClusterHelper.getInterface(cluster, interfacetype).getEndpoint(); - if (new Path(endpoint).toUri().getScheme() == null) { + URI uri = new Path(endpoint).toUri(); + if (uri.getScheme() == null) { + if (Interfacetype.WORKFLOW == interfacetype + && uri.toString().equals(OozieConstants.LOCAL_OOZIE)) { + return; + } throw new ValidationException("Cannot get valid scheme for interface: " + interfacetype + " of cluster: " + cluster.getName()); } @@ -146,7 +153,9 @@ public class ClusterEntityParser extends EntityParser<Cluster> { protected void validateWorkflowInterface(Cluster cluster) throws ValidationException { final String workflowUrl = ClusterHelper.getOozieUrl(cluster); LOG.info("Validating workflow interface: {}", workflowUrl); - + if (OozieConstants.LOCAL_OOZIE.equals(workflowUrl)) { + return; + } try { if (!WorkflowEngineFactory.getWorkflowEngine().isAlive(cluster)) { throw new ValidationException("Unable to reach Workflow server:" + workflowUrl); http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java index b5f531a..7b53ebb 100644 --- a/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java +++ b/common/src/main/java/org/apache/falcon/entity/store/ConfigurationStore.java @@ -61,6 +61,7 @@ public final class ConfigurationStore implements FalconService { private static final Logger LOG = LoggerFactory.getLogger(ConfigurationStore.class); private static final Logger AUDIT = LoggerFactory.getLogger("AUDIT"); private static final String UTF_8 = CharEncoding.UTF_8; + private final boolean shouldPersist; private static final FsPermission STORE_PERMISSION = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE); @@ -93,17 +94,20 @@ public final class ConfigurationStore implements FalconService { return STORE; } - private final FileSystem fs; - private final Path storePath; + private FileSystem fs; + private Path storePath; private ConfigurationStore() { for (EntityType type : EntityType.values()) { dictionary.put(type, new ConcurrentHashMap<String, Entity>()); } - String uri = StartupProperties.get().getProperty("config.store.uri"); - storePath = new Path(uri); - fs = initializeFileSystem(); + shouldPersist = Boolean.parseBoolean(StartupProperties.get().getProperty("config.store.persist", "true")); + if (shouldPersist) { + String uri = StartupProperties.get().getProperty("config.store.uri"); + storePath = new Path(uri); + fs = initializeFileSystem(); + } } /** @@ -140,24 +144,26 @@ public final class ConfigurationStore implements FalconService { registerListener(listener); } - try { - for (EntityType type : ENTITY_LOAD_ORDER) { - ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type); - FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*")); - if (files != null) { - for (FileStatus file : files) { - String fileName = file.getPath().getName(); - String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop - // ".xml" - String entityName = URLDecoder.decode(encodedEntityName, UTF_8); - Entity entity = restore(type, entityName); - entityMap.put(entityName, entity); - onReload(entity); + if (shouldPersist) { + try { + for (EntityType type : ENTITY_LOAD_ORDER) { + ConcurrentHashMap<String, Entity> entityMap = dictionary.get(type); + FileStatus[] files = fs.globStatus(new Path(storePath, type.name() + Path.SEPARATOR + "*")); + if (files != null) { + for (FileStatus file : files) { + String fileName = file.getPath().getName(); + String encodedEntityName = fileName.substring(0, fileName.length() - 4); // drop + // ".xml" + String entityName = URLDecoder.decode(encodedEntityName, UTF_8); + Entity entity = restore(type, entityName); + entityMap.put(entityName, entity); + onReload(entity); + } } } + } catch (IOException e) { + throw new FalconException("Unable to restore configurations", e); } - } catch (IOException e) { - throw new FalconException("Unable to restore configurations", e); } } @@ -261,7 +267,7 @@ public final class ConfigurationStore implements FalconService { return (T) updatesInProgress.get(); } T entity = (T) entityMap.get(name); - if (entity == NULL) { // Object equality being checked + if (entity == NULL && shouldPersist) { // Object equality being checked try { entity = this.restore(type, name); } catch (IOException e) { @@ -322,6 +328,9 @@ public final class ConfigurationStore implements FalconService { * @throws FalconException */ private void persist(EntityType type, Entity entity) throws IOException, FalconException { + if (!shouldPersist) { + return; + } OutputStream out = fs .create(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(entity.getName(), UTF_8) + ".xml")); @@ -344,6 +353,9 @@ public final class ConfigurationStore implements FalconService { * @throws IOException If any error in accessing the storage */ private void archive(EntityType type, String name) throws IOException { + if (!shouldPersist) { + return; + } Path archivePath = new Path(storePath, "archive" + Path.SEPARATOR + type); HadoopClientFactory.mkdirs(fs, archivePath, STORE_PERMISSION); fs.rename(new Path(storePath, type + Path.SEPARATOR + URLEncoder.encode(name, UTF_8) + ".xml"), http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/common/src/main/java/org/apache/falcon/util/DateUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/DateUtil.java b/common/src/main/java/org/apache/falcon/util/DateUtil.java new file mode 100644 index 0000000..e736340 --- /dev/null +++ b/common/src/main/java/org/apache/falcon/util/DateUtil.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.util; + +import java.util.Calendar; +import java.util.Date; +import java.util.TimeZone; + +/** + * Helper to get date operations. + */ +public final class DateUtil { + + private DateUtil() {} + + public static Date getNextMinute(Date time) throws Exception { + Calendar insCal = Calendar.getInstance(TimeZone.getTimeZone("UTC")); + insCal.setTime(time); + + insCal.add(Calendar.MINUTE, 1); + return insCal.getTime(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java b/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java new file mode 100644 index 0000000..05f248e --- /dev/null +++ b/common/src/main/java/org/apache/falcon/workflow/util/OozieConstants.java @@ -0,0 +1,33 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.workflow.util; + +/** + * Oozie Constants used across multiple modules. + */ +public final class OozieConstants { + /** + * Constant for the oozie running in local. + */ + public static final String LOCAL_OOZIE = "localoozie"; + + private OozieConstants() { + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java index 7156bbd..d5b2eb3 100644 --- a/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java +++ b/hadoop-dependencies/src/main/java/org/apache/falcon/hadoop/JailedFileSystem.java @@ -58,7 +58,7 @@ public class JailedFileSystem extends FileSystem { throw new IOException("Incomplete Jail URI, no jail base: "+ name); } basePath = new Path(conf.get("jail.base", System.getProperty("hadoop.tmp.dir", - System.getProperty("user.dir") + "/webapp/target/tmp-hadoop-" + System.getProperty("user.dir") + "/target/falcon/tmp-hadoop-" + System.getProperty("user.name"))) + "/jail-fs/" + base).toUri().getPath(); this.uri = URI.create(name.getScheme()+"://"+name.getAuthority()); } http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java index 9ca0ac1..9a6b14c 100644 --- a/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java +++ b/oozie/src/main/java/org/apache/falcon/oozie/OozieEntityBuilder.java @@ -26,6 +26,7 @@ import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.cluster.Cluster; import org.apache.falcon.entity.v0.cluster.ClusterLocationType; +import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.entity.v0.cluster.Property; import org.apache.falcon.entity.v0.feed.Feed; import org.apache.falcon.entity.v0.process.Output; @@ -38,6 +39,7 @@ import org.apache.falcon.service.FalconPathFilter; import org.apache.falcon.service.SharedLibraryHostingService; import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; +import org.apache.falcon.workflow.util.OozieConstants; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -168,8 +170,10 @@ public abstract class OozieEntityBuilder<T extends Entity> { properties.setProperty(AbstractWorkflowEngine.NAME_NODE, ClusterHelper.getStorageUrl(cluster)); properties.setProperty(AbstractWorkflowEngine.JOB_TRACKER, ClusterHelper.getMREndPoint(cluster)); properties.setProperty("colo.name", cluster.getColo()); - - properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true"); + final String endpoint = ClusterHelper.getInterface(cluster, Interfacetype.WORKFLOW).getEndpoint(); + if (!OozieConstants.LOCAL_OOZIE.equals(endpoint)) { + properties.setProperty(OozieClient.USE_SYSTEM_LIBPATH, "true"); + } properties.setProperty("falcon.libpath", ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath() + "/lib"); http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java index 622238a..ae5c5fa 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieClientFactory.java @@ -23,7 +23,11 @@ import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.store.ConfigurationStore; import org.apache.falcon.entity.v0.EntityType; import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.workflow.util.OozieConstants; +import org.apache.oozie.client.LocalProxyOozieClient; +import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.ProxyOozieClient; +import org.apache.oozie.local.LocalOozie; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,13 +37,12 @@ import org.slf4j.LoggerFactory; public final class OozieClientFactory { private static final Logger LOG = LoggerFactory.getLogger(OozieClientFactory.class); - private static final String LOCAL_OOZIE = "local"; private static volatile boolean localInitialized = false; private OozieClientFactory() {} - public static synchronized ProxyOozieClient get(Cluster cluster) + public static synchronized OozieClient get(Cluster cluster) throws FalconException { assert cluster != null : "Cluster cant be null"; @@ -48,28 +51,27 @@ public final class OozieClientFactory { return getClientRef(oozieUrl); } - public static ProxyOozieClient get(String clusterName) throws FalconException { + public static OozieClient get(String clusterName) throws FalconException { return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, clusterName)); } - private static ProxyOozieClient getClientRef(String oozieUrl) + private static OozieClient getClientRef(String oozieUrl) throws FalconException { - if (LOCAL_OOZIE.equals(oozieUrl)) { + if (OozieConstants.LOCAL_OOZIE.equals(oozieUrl)) { return getLocalOozieClient(); } else { return new ProxyOozieClient(oozieUrl); } } - private static ProxyOozieClient getLocalOozieClient() throws FalconException { + private static OozieClient getLocalOozieClient() throws FalconException { try { if (!localInitialized) { - //LocalOozie.start(); + LocalOozie.start(); localInitialized = true; } - //return LocalOozie.getClient(); - return null; + return new LocalProxyOozieClient(); } catch (Exception e) { throw new FalconException(e); } http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 4085b8f..2f3dc6f 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -58,7 +58,6 @@ import org.apache.oozie.client.Job; import org.apache.oozie.client.Job.Status; import org.apache.oozie.client.OozieClient; import org.apache.oozie.client.OozieClientException; -import org.apache.oozie.client.ProxyOozieClient; import org.apache.oozie.client.WorkflowAction; import org.apache.oozie.client.WorkflowJob; import org.apache.oozie.client.rest.RestConstants; @@ -210,7 +209,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { private void dryRunInternal(Cluster cluster, Path buildPath) throws FalconException { BUNDLEAPP bundle = OozieBundleBuilder.unmarshal(cluster, buildPath); - ProxyOozieClient client = OozieClientFactory.get(cluster.getName()); + OozieClient client = OozieClientFactory.get(cluster.getName()); for (COORDINATOR coord : bundle.getCoordinator()) { Properties props = new Properties(); props.setProperty(OozieClient.COORDINATOR_APP_PATH, coord.getAppPath()); @@ -396,7 +395,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private void killBundle(String clusterName, BundleJob job) throws FalconException { - ProxyOozieClient client = OozieClientFactory.get(clusterName); + OozieClient client = OozieClientFactory.get(clusterName); try { //kill all coords for (CoordinatorJob coord : job.getCoordinators()) { @@ -459,7 +458,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { List<Instance> runInstances = new ArrayList<Instance>(); for (String cluster : clusters) { - ProxyOozieClient client = OozieClientFactory.get(cluster); + OozieClient client = OozieClientFactory.get(cluster); List<String> wfNames = EntityUtil.getWorkflowNames(entity); List<WorkflowJob> wfs = getRunningWorkflows(cluster, wfNames); if (wfs != null) { @@ -615,7 +614,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } List<BundleJob> bundles = entry.getValue(); - ProxyOozieClient client = OozieClientFactory.get(cluster); + OozieClient client = OozieClientFactory.get(cluster); List<CoordinatorJob> applicableCoords = getApplicableCoords(client, start, end, bundles, lifeCycles); long unscheduledInstances = 0; @@ -901,7 +900,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { for (Map.Entry<String, List<BundleJob>> entry : bundlesMap.entrySet()) { String cluster = entry.getKey(); List<BundleJob> bundles = entry.getValue(); - ProxyOozieClient client = OozieClientFactory.get(cluster); + OozieClient client = OozieClientFactory.get(cluster); List<CoordinatorJob> applicableCoords = getApplicableCoords(client, start, end, bundles, lifeCycles); List<CoordinatorAction> actions = new ArrayList<CoordinatorAction>(); @@ -947,7 +946,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { return coord.getAppName().contains(LifeCycle.EVICTION.getTag().name()); } - private void addCoordAction(ProxyOozieClient client, List<CoordinatorAction> actions, String actionId) { + private void addCoordAction(OozieClient client, List<CoordinatorAction> actions, String actionId) { CoordinatorAction coordActionInfo = null; try { coordActionInfo = client.getCoordActionInfo(actionId); @@ -984,7 +983,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } - private List<CoordinatorJob> getApplicableCoords(ProxyOozieClient client, Date start, Date end, + private List<CoordinatorJob> getApplicableCoords(OozieClient client, Date start, Date end, List<BundleJob> bundles, List<LifeCycle> lifeCycles) throws FalconException { List<CoordinatorJob> applicableCoords = new ArrayList<CoordinatorJob>(); @@ -1323,7 +1322,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public void reRun(String cluster, String jobId, Properties props, boolean isForced) throws FalconException { - ProxyOozieClient client = OozieClientFactory.get(cluster); + OozieClient client = OozieClientFactory.get(cluster); try { WorkflowJob jobInfo = client.getJobInfo(jobId); Properties jobprops = OozieUtils.toProperties(jobInfo.getConf()); @@ -1385,7 +1384,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public String getWorkflowStatus(String cluster, String jobId) throws FalconException { - ProxyOozieClient client = OozieClientFactory.get(cluster); + OozieClient client = OozieClientFactory.get(cluster); try { if (jobId.endsWith("-W")) { WorkflowJob jobInfo = client.getJobInfo(jobId); @@ -1489,7 +1488,7 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { // assert that its really changed try { - ProxyOozieClient client = OozieClientFactory.get(cluster); + OozieClient client = OozieClientFactory.get(cluster); CoordinatorJob coord = client.getCoordJobInfo(id); for (int counter = 0; counter < 3; counter++) { Date intendedPauseTime = (StringUtils.isEmpty(pauseTime) ? null : SchemaHelper.parseDateUTC(pauseTime)); http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientBundle.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientBundle.java b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientBundle.java new file mode 100644 index 0000000..93b4337 --- /dev/null +++ b/oozie/src/main/java/org/apache/oozie/client/LocalOozieClientBundle.java @@ -0,0 +1,382 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.client; + +import org.apache.oozie.BaseEngineException; +import org.apache.oozie.BundleEngine; +import org.apache.oozie.BundleEngineException; +import org.apache.oozie.BundleJobBean; +import org.apache.oozie.BundleJobInfo; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.util.XConfiguration; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; + + +/** + * Client API to submit and manage Oozie bundle jobs against an Oozie + * intance. + */ +public class LocalOozieClientBundle extends OozieClient { + + private final BundleEngine bundleEngine; + + /** + * Create a bundle client for Oozie local use. + * <p/> + * + * @param bundleEngine the engine instance to use. + */ + public LocalOozieClientBundle(BundleEngine bundleEngine) { + this.bundleEngine = bundleEngine; + } + + /** + * Return the Oozie URL of the bundle client instance. + * <p/> + * This URL is the base URL for the Oozie system, without protocol + * versioning. + * + * @return the Oozie URL of the bundle client instance. + */ + @Override + public String getOozieUrl() { + return "localoozie"; + } + + /** + * Return the Oozie URL used by the client and server for WS communications. + * <p/> + * This URL is the original URL plus the versioning element path. + * + * @return the Oozie URL used by the client and server for communication. + * @throws OozieClientException thrown in the client + * and the server are not protocol compatible. + */ + @Override + public String getProtocolUrl() throws OozieClientException { + return "localoozie"; + } + + /** + * Validate that the Oozie client and server instances are protocol + * compatible. + * + * @throws OozieClientException thrown in the client + * and the server are not protocol compatible. + */ + @Override + public synchronized void validateWSVersion() throws OozieClientException { + } + + /** + * Create an empty configuration with just the {@link #USER_NAME} set to the + * JVM user name and the {@link #GROUP_NAME} set to 'other'. + * + * @return an empty configuration. + */ + @Override + public Properties createConfiguration() { + Properties conf = new Properties(); + if (bundleEngine != null) { + conf.setProperty(USER_NAME, bundleEngine.getUser()); + } + return conf; + } + + /** + * Set a HTTP header to be used in the WS requests by the bundle + * instance. + * + * @param name header name. + * @param value header value. + */ + @Override + public void setHeader(String name, String value) { + } + + /** + * Get the value of a set HTTP header from the bundle instance. + * + * @param name header name. + * @return header value, <code>null</code> if not set. + */ + @Override + public String getHeader(String name) { + return null; + } + + /** + * Remove a HTTP header from the bundle client instance. + * + * @param name header name. + */ + @Override + public void removeHeader(String name) { + } + + /** + * Return an iterator with all the header names set in the bundle + * instance. + * + * @return header names. + */ + @Override + @SuppressWarnings("unchecked") + public Iterator<String> getHeaderNames() { + return Collections.EMPTY_SET.iterator(); + } + + /** + * Submit a bundle job. + * + * @param conf job configuration. + * @return the job Id. + * @throws OozieClientException thrown if the job + * could not be submitted. + */ + @Override + public String submit(Properties conf) throws OozieClientException { + try { + return bundleEngine.submitJob(new XConfiguration(conf), false); + } catch (BundleEngineException ex) { + throw new OozieClientException(ex.getErrorCode().toString(), ex); + } + } + + /** + * Start a bundle job. + * + * @param jobId job Id. + * @throws OozieClientException thrown if the job + * could not be started. + */ + @Override + @Deprecated + public void start(String jobId) throws OozieClientException { + try { + bundleEngine.start(jobId); + } catch (BundleEngineException ex) { + throw new OozieClientException(ex.getErrorCode().toString(), ex); + } catch (BaseEngineException bex) { + throw new OozieClientException(bex.getErrorCode().toString(), bex); + } + } + + /** + * Submit and start a bundle job. + * + * @param conf job configuration. + * @return the job Id. + * @throws OozieClientException thrown if the job + * could not be submitted. + */ + @Override + public String run(Properties conf) throws OozieClientException { + try { + return bundleEngine.submitJob(new XConfiguration(conf), true); + } catch (BundleEngineException ex) { + throw new OozieClientException(ex.getErrorCode().toString(), ex); + } + } + + /** + * Rerun a workflow job. + * + * @param jobId job Id to rerun. + * @param conf configuration information for the rerun. + * @throws OozieClientException thrown if the job + * could not be started. + */ + @Override + @Deprecated + public void reRun(String jobId, Properties conf) throws OozieClientException { + throw new OozieClientException(ErrorCode.E0301.toString(), "no-op"); + } + + /** + * Rerun bundle coordinators. + * + * @param jobId bundle jobId + * @param coordScope rerun scope for coordinator jobs + * @param dateScope rerun scope for date + * @param refresh true if -refresh is given in command option + * @param noCleanup true if -nocleanup is given in command option + * @throws OozieClientException + */ + @Override + public Void reRunBundle(String jobId, String coordScope, String dateScope, boolean refresh, + boolean noCleanup) throws OozieClientException { + try { + new BundleEngine().reRun(jobId, coordScope, dateScope, refresh, noCleanup); + } catch (BaseEngineException e) { + throw new OozieClientException(e.getErrorCode().toString(), e); + } + return null; + } + + /** + * Suspend a bundle job. + * + * @param jobId job Id. + * @throws OozieClientException thrown if the job + * could not be suspended. + */ + @Override + public void suspend(String jobId) throws OozieClientException { + try { + bundleEngine.suspend(jobId); + } catch (BundleEngineException ex) { + throw new OozieClientException(ex.getErrorCode().toString(), ex); + } + } + + /** + * Resume a bundle job. + * + * @param jobId job Id. + * @throws OozieClientException thrown if the job + * could not be resume. + */ + @Override + public void resume(String jobId) throws OozieClientException { + try { + bundleEngine.resume(jobId); + } catch (BundleEngineException ex) { + throw new OozieClientException(ex.getErrorCode().toString(), ex); + } + } + + /** + * Kill a bundle job. + * + * @param jobId job Id. + * @throws OozieClientException thrown if the job + * could not be killed. + */ + @Override + public void kill(String jobId) throws OozieClientException { + try { + bundleEngine.kill(jobId); + } catch (BundleEngineException ex) { + throw new OozieClientException(ex.getErrorCode().toString(), ex); + } + } + + /** + * Get the info of a workflow job. + * + * @param jobId job Id. + * @return the job info. + * @throws OozieClientException thrown if the job + * info could not be retrieved. + */ + @Override + @Deprecated + public WorkflowJob getJobInfo(String jobId) throws OozieClientException { + throw new OozieClientException(ErrorCode.E0301.toString(), "no-op"); + } + + /** + * Get the info of a bundle job. + * + * @param jobId job Id. + * @return the job info. + * @throws OozieClientException thrown if the job + * info could not be retrieved. + */ + @Override + public BundleJob getBundleJobInfo(String jobId) throws OozieClientException { + try { + return bundleEngine.getBundleJob(jobId); + } catch (BundleEngineException ex) { + throw new OozieClientException(ex.getErrorCode().toString(), ex); + } catch (BaseEngineException bex) { + throw new OozieClientException(bex.getErrorCode().toString(), bex); + } + } + + /** + * Return the info of the workflow jobs that match the filter. + * + * @param filter job filter. Refer to the {@link OozieClient} for the filter + * syntax. + * @param start jobs offset, base 1. + * @param len number of jobs to return. + * @return a list with the workflow jobs info, without node details. + * @throws OozieClientException thrown if the jobs info could not be + * retrieved. + */ + @Override + @Deprecated + public List<WorkflowJob> getJobsInfo(String filter, int start, int len) throws OozieClientException { + throw new OozieClientException(ErrorCode.E0301.toString(), "no-op"); + } + + /** + * Return the info of the bundle jobs that match the filter. + * + * @param filter job filter. Refer to the {@link OozieClient} for the filter + * syntax. + * @param start jobs offset, base 1. + * @param len number of jobs to return. + * @return a list with the coordinator jobs info + * @throws OozieClientException thrown if the jobs info could not be + * retrieved. + */ + @Override + public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException { + try { + start = (start < 1) ? 1 : start; // taken from oozie API + len = (len < 1) ? 50 : len; + BundleJobInfo info = bundleEngine.getBundleJobs(filter, start, len); + List<BundleJob> jobs = new ArrayList<BundleJob>(); + List<BundleJobBean> jobBeans = info.getBundleJobs(); + for (BundleJobBean jobBean : jobBeans) { + jobs.add(jobBean); + } + return jobs; + + } catch (BundleEngineException ex) { + throw new OozieClientException(ex.getErrorCode().toString(), ex); + } + } + + /** + * Return the info of the workflow jobs that match the filter. + * <p/> + * It returns the first 100 jobs that match the filter. + * + * @param filter job filter. Refer to the {@link org.apache.oozie.LocalOozieClient} for the + * filter syntax. + * @return a list with the workflow jobs info, without node details. + * @throws OozieClientException thrown if the jobs + * info could not be retrieved. + */ + @Override + @Deprecated + public List<WorkflowJob> getJobsInfo(String filter) throws OozieClientException { + throw new OozieClientException(ErrorCode.E0301.toString(), "no-op"); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java new file mode 100644 index 0000000..217cec9 --- /dev/null +++ b/oozie/src/main/java/org/apache/oozie/client/LocalProxyOozieClient.java @@ -0,0 +1,188 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.oozie.client; + +import org.apache.oozie.BundleEngine; +import org.apache.oozie.LocalOozieClient; +import org.apache.oozie.LocalOozieClientCoord; +import org.apache.oozie.local.LocalOozie; +import org.apache.oozie.service.BundleEngineService; +import org.apache.oozie.service.Services; + +import java.io.PrintStream; +import java.util.List; +import java.util.Properties; + +/** + * Oozie Client for Local Oozie. + */ +public class LocalProxyOozieClient extends OozieClient { + + private static LocalOozieClientBundle localOozieClientBundle; + private static LocalOozieClientCoord localOozieClientCoord; + private static LocalOozieClient localOozieClient; + private static final BundleEngine BUNDLE_ENGINE = Services.get(). + get(BundleEngineService.class).getBundleEngine(System.getProperty("user.name")); + + + private LocalOozieClientBundle getLocalOozieClientBundle() { + if (localOozieClientBundle == null) { + localOozieClientBundle = new LocalOozieClientBundle(BUNDLE_ENGINE); + } + return localOozieClientBundle; + } + + private LocalOozieClient getLocalOozieClient() { + if (localOozieClient == null) { + localOozieClient = (LocalOozieClient) LocalOozie.getClient(); + } + return localOozieClient; + } + + private LocalOozieClientCoord getLocalOozieClientCoord() { + if (localOozieClientCoord == null) { + localOozieClientCoord = (LocalOozieClientCoord) LocalOozie.getCoordClient(); + } + return localOozieClientCoord; + } + + @Override + public BundleJob getBundleJobInfo(String jobId) throws OozieClientException { + return getLocalOozieClientBundle().getBundleJobInfo(jobId); + } + + @Override + public List<BundleJob> getBundleJobsInfo(String filter, int start, int len) throws OozieClientException { + return getLocalOozieClientBundle().getBundleJobsInfo(filter, start, len); + } + + public String run(Properties conf) throws OozieClientException { + return getLocalOozieClientBundle().run(conf); + } + + @Override + public Void reRunBundle(final String jobId, final String coordScope, final String dateScope, + final boolean refresh, final boolean noCleanup) throws OozieClientException { + return getLocalOozieClientBundle().reRunBundle(jobId, coordScope, dateScope, refresh, noCleanup); + } + + @Override + public String dryrun(Properties conf) { + return null; + } + + @Override + public CoordinatorAction getCoordActionInfo(String actionId) throws OozieClientException { + return getLocalOozieClientCoord().getCoordActionInfo(actionId); + } + + + @Override + public CoordinatorJob getCoordJobInfo(final String jobId) throws OozieClientException { + return getLocalOozieClientCoord().getCoordJobInfo(jobId); + } + + @Override + public List<CoordinatorJob> getCoordJobsInfo(final String filter, final int start, + final int len) throws OozieClientException { + return getLocalOozieClientCoord().getCoordJobsInfo(filter, start, len); + } + + @Override + public CoordinatorJob getCoordJobInfo(final String jobId, final String filter, + final int start, final int len) throws OozieClientException { + return getLocalOozieClientCoord().getCoordJobInfo(jobId, filter, start, len); + } + + @Override + public List<CoordinatorAction> reRunCoord(final String jobId, final String rerunType, + final String scope, final boolean refresh, + final boolean noCleanup) throws OozieClientException { + return getLocalOozieClientCoord().reRunCoord(jobId, rerunType, scope, refresh, noCleanup); + } + + @Override + public List<WorkflowJob> getJobsInfo(final String filter) throws OozieClientException { + return getLocalOozieClientCoord().getJobsInfo(filter); + } + + @Override + public List<WorkflowJob> getJobsInfo(final String filter, final int start, + final int len) throws OozieClientException { + return getLocalOozieClientCoord().getJobsInfo(filter, start, len); + } + + @Override + public WorkflowJob getJobInfo(final String jobId) throws OozieClientException { + return getLocalOozieClient().getJobInfo(jobId); + } + + + @Override + public WorkflowAction getWorkflowActionInfo(final String actionId) throws OozieClientException { + return getLocalOozieClient().getWorkflowActionInfo(actionId); + } + + @Override + public WorkflowJob getJobInfo(final String jobId, final int start, final int len) throws OozieClientException { + return getLocalOozieClient().getJobInfo(jobId, start, len); + } + + @Override + public String getJobId(final String externalId) throws OozieClientException { + return getLocalOozieClient().getJobId(externalId); + } + + @Override + public void reRun(String jobId, Properties conf) throws OozieClientException { + throw new IllegalStateException("Rerun not supported "); + } + + @Override + public void suspend(String jobId) throws OozieClientException { + throw new IllegalStateException("Suspend not supported "); + } + + @Override + public void resume(String jobId) throws OozieClientException { + throw new IllegalStateException("Resume not supported "); + } + + @Override + public void kill(String jobId) throws OozieClientException { + throw new IllegalStateException("Kill not supported"); + } + + @Override + public void change(final String jobId, final String changeValue) throws OozieClientException { + throw new IllegalStateException("Change not supported"); + } + + @Override + public void getJobLog(final String jobId, final String logRetrievalType, + final String logRetrievalScope, final PrintStream ps) throws OozieClientException { + throw new IllegalStateException("Job logs not supported"); + } + + @Override + public String getJobLog(final String jobId) throws OozieClientException { + throw new IllegalStateException("Job logs not supported"); + } + +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 31997e8..34a5471 100644 --- a/pom.xml +++ b/pom.xml @@ -390,6 +390,7 @@ <module>archival</module> <module>rerun</module> <module>prism</module> + <module>unit</module> <module>webapp</module> <module>docs</module> </modules> http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/unit/pom.xml ---------------------------------------------------------------------- diff --git a/unit/pom.xml b/unit/pom.xml new file mode 100644 index 0000000..ae92687 --- /dev/null +++ b/unit/pom.xml @@ -0,0 +1,106 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>falcon-main</artifactId> + <groupId>org.apache.falcon</groupId> + <version>0.7-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>falcon-unit</artifactId> + + <dependencies> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-common</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.oozie</groupId> + <artifactId>oozie-core</artifactId> + </dependency> + + <dependency> + <groupId>org.testng</groupId> + <artifactId>testng</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-client</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-hadoop-dependencies</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-oozie-el-extension</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-oozie-adaptor</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-prism</artifactId> + <classifier>classes</classifier> + <version>${project.version}</version> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <configuration> + <source>1.7</source> + <target>1.7</target> + </configuration> + </plugin> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-jar-plugin</artifactId> + <version>2.4</version> + <configuration> + <excludes> + <exclude>**/log4j.xml</exclude> + </excludes> + </configuration> + </plugin> + </plugins> + </build> + +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java new file mode 100644 index 0000000..eebfa2e --- /dev/null +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnit.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit; + +import org.apache.commons.io.FileUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.hadoop.JailedFileSystem; +import org.apache.falcon.security.CurrentUser; +import org.apache.falcon.service.ServiceInitializer; +import org.apache.falcon.util.RuntimeProperties; +import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.oozie.local.LocalOozie; +import org.apache.oozie.service.Services; +import org.apache.oozie.util.XConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * FalconUnit runs jobs in an Local Mode and Cluster mode . <p/> Falon Unit is meant for development/debugging purposes + * only. + */ +public final class FalconUnit { + + private static final Logger LOG = LoggerFactory.getLogger(FalconUnit.class); + private static final String OOZIE_SITE_XML = "oozie-site.xml"; + private static final String OOZIE_DEFAULT_XML = "oozie-default.xml"; + private static final String STORAGE_URL = "jail://global:00"; + private static final String OOZIE_HOME_DIR = "/tmp/oozie-" + System.getProperty("user.name"); + + private static JailedFileSystem jailedFileSystem = new JailedFileSystem(); + private static final ServiceInitializer STARTUP_SERVICES = new ServiceInitializer(); + private static Map<String, String> sysProps; + private static FalconUnitClient falconUnitClient; + private static boolean isLocalMode; + private static boolean isFalconUnitActive = false; + + private FalconUnit() { + } + + + public static synchronized void start(boolean isLocal) throws FalconException, IOException { + if (isFalconUnitActive) { + throw new IllegalStateException("Falcon Unit is already initialized"); + } + isLocalMode = isLocal; + //Initialize Startup and runtime properties + LOG.info("Initializing startup properties ..."); + StartupProperties.get(); + + LOG.info("Initializing runtime properties ..."); + RuntimeProperties.get(); + + //Initializing Services + STARTUP_SERVICES.initialize(); + ConfigurationStore.get(); + + if (isLocalMode) { + setupOozieConfigs(); + initFileSystem(); + } + isFalconUnitActive = true; + + } + + private static void initFileSystem() throws IOException { + Configuration conf = new Configuration(); + conf.set("fs.defaultFS", STORAGE_URL); + jailedFileSystem.initialize(LocalFileSystem.getDefaultUri(conf), conf); + } + + private static void setupOozieConfigs() throws IOException { + sysProps = new HashMap<>(); + String oozieHomeDir = OOZIE_HOME_DIR; + String oozieConfDir = oozieHomeDir + "/conf"; + String oozieHadoopConfDir = oozieConfDir + "/hadoop-conf"; + String oozieActionConfDir = oozieConfDir + "/action-conf"; + String oozieLogsDir = oozieHomeDir + "/logs"; + String oozieDataDir = oozieHomeDir + "/data"; + + LocalFileSystem fs = new LocalFileSystem(); + fs.mkdirs(new Path(oozieHomeDir)); + fs.mkdirs(new Path(oozieConfDir)); + fs.mkdirs(new Path(oozieHadoopConfDir)); + fs.mkdirs(new Path(oozieActionConfDir)); + fs.mkdirs(new Path(oozieLogsDir)); + fs.close(); + + setSystemProperty("oozie.home.dir", oozieHomeDir); + setSystemProperty("oozie.data.dir", oozieDataDir); + setSystemProperty("oozie.action.conf", oozieActionConfDir); + setSystemProperty("oozie.log.dir", oozieLogsDir); + setSystemProperty("oozie.log4j.file", "localoozie-log4j.properties"); + setSystemProperty("oozielocal.log", "oozieLogsDir/oozielocal.log"); + + Configuration oozieSiteConf = new Configuration(false); + ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); + InputStream oozieSiteInputStream = classLoader.getResourceAsStream(OOZIE_SITE_XML); + XConfiguration configuration = new XConfiguration(oozieSiteInputStream); + Properties props = configuration.toProperties(); + for (String propName : props.stringPropertyNames()) { + oozieSiteConf.set(propName, props.getProperty(propName)); + } + oozieSiteInputStream.close(); + + InputStream oozieDefaultInputStream = classLoader.getResourceAsStream(OOZIE_DEFAULT_XML); + configuration = new XConfiguration(oozieDefaultInputStream); + String classes = configuration.get(Services.CONF_SERVICE_CLASSES); + oozieSiteConf.set(Services.CONF_SERVICE_CLASSES, classes.replaceAll( + "org.apache.oozie.service.ShareLibService,", "")); + File target = new File(oozieConfDir, OOZIE_SITE_XML); + FileOutputStream outStream = null; + try { + outStream = new FileOutputStream(target); + oozieSiteConf.writeXml(outStream); + } finally { + if (outStream != null) { + outStream.close(); + } + } + oozieDefaultInputStream.close(); + + CurrentUser.authenticate(System.getProperty("user.name")); + } + + public static synchronized void cleanup() throws Exception { + STARTUP_SERVICES.destroy(); + if (isLocalMode) { + cleanUpOozie(); + jailedFileSystem.close(); + } + isFalconUnitActive = false; + } + + private static void cleanUpOozie() throws IOException, FalconException { + LocalOozie.stop(); + FileUtils.deleteDirectory(new File(OOZIE_HOME_DIR)); + resetSystemProperties(); + System.setSecurityManager(null); + } + + public static synchronized FalconUnitClient getClient() throws FalconException { + if (!isFalconUnitActive) { + throw new IllegalStateException("Falcon Unit is not initialized"); + } + if (falconUnitClient == null) { + falconUnitClient = new FalconUnitClient(); + } + return falconUnitClient; + } + + public static FileSystem getFileSystem() throws IOException { + if (!isFalconUnitActive) { + throw new IllegalStateException("Falcon Unit is not initialized"); + } + return jailedFileSystem; + } + + // Setting System properties and store their actual values + private static void setSystemProperty(String name, String value) { + if (!sysProps.containsKey(name)) { + String currentValue = System.getProperty(name); + sysProps.put(name, currentValue); + } + if (value != null) { + System.setProperty(name, value); + } else { + System.getProperties().remove(name); + } + } + + + /** + * Reset changed system properties to their original values. + */ + private static void resetSystemProperties() { + if (sysProps != null) { + for (Map.Entry<String, String> entry : sysProps.entrySet()) { + if (entry.getValue() != null) { + System.setProperty(entry.getKey(), entry.getValue()); + } else { + System.getProperties().remove(entry.getKey()); + } + } + sysProps.clear(); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java new file mode 100644 index 0000000..e898fc3 --- /dev/null +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitClient.java @@ -0,0 +1,250 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit; + +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; +import org.apache.falcon.client.AbstractFalconClient; +import org.apache.falcon.client.FalconCLIException; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.parser.EntityParser; +import org.apache.falcon.entity.parser.EntityParserFactory; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.feed.Feed; +import org.apache.falcon.entity.v0.process.Cluster; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.entity.v0.process.Validity; +import org.apache.falcon.resource.APIResult; +import org.apache.falcon.resource.InstancesResult; +import org.apache.falcon.util.DateUtil; +import org.apache.falcon.workflow.WorkflowEngineFactory; +import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.Date; +import java.util.List; +import java.util.TimeZone; + +/** + * Client for Falcon Unit. + */ +public class FalconUnitClient extends AbstractFalconClient { + + private static final Logger LOG = LoggerFactory.getLogger(FalconUnitClient.class); + + protected ConfigurationStore configStore; + private AbstractWorkflowEngine workflowEngine; + + public FalconUnitClient() throws FalconException { + configStore = ConfigurationStore.get(); + workflowEngine = WorkflowEngineFactory.getWorkflowEngine(); + } + + public ConfigurationStore getConfigStore() { + return this.configStore; + } + + + /** + * Submit a new entity. Entities can be of type feed, process or data end + * points. Entity definitions are validated structurally against schema and + * subsequently for other rules before they are admitted into the system + * + * @param type entity type + * @param filePath path for the definition of entity + * @return boolean + */ + @Override + public APIResult submit(String type, String filePath) throws IOException, FalconCLIException { + try { + EntityType entityType = EntityType.getEnum(type); + InputStream entityStream = FalconUnitHelper.getFileInputStream(filePath); + EntityParser entityParser = EntityParserFactory.getParser(entityType); + Entity entity = entityParser.parse(entityStream); + + Entity existingEntity = configStore.get(entityType, entity.getName()); + if (existingEntity != null) { + if (EntityUtil.equals(existingEntity, entity)) { + LOG.warn(entity.toShortString() + " already registered with same definition " + entity.getName()); + return new APIResult(APIResult.Status.SUCCEEDED, "{} already registered with same definition" + + entity.getName()); + } + LOG.warn(entity.toShortString() + " already registered with different definition " + + "Can't be submitted again. Try removing before submitting."); + return new APIResult(APIResult.Status.FAILED, "{} already registered with different definition " + + "Can't be submitted again. Try removing before submitting." + entity.getName()); + } + + entityParser.validate(entity); + configStore.publish(entityType, entity); + LOG.info("Submit successful: ({}): {}", entityType.name(), entity.getName()); + return new APIResult(APIResult.Status.SUCCEEDED, "Submit successful (" + type + ") " + entity.getName()); + } catch (FalconException e) { + throw new FalconCLIException("FAILED", e); + } + } + + /** + * Schedules submitted entity. + * + * @param entityType entity Type + * @param entityName entity name + * @param cluster cluster on which it has to be scheduled + * @return + * @throws FalconCLIException + * @throws FalconException + */ + @Override + public APIResult schedule(EntityType entityType, String entityName, String cluster) throws FalconCLIException { + return schedule(entityType, entityName, null, 0, cluster); + } + + + /** + * Schedules an submitted process entity immediately. + * + * @param entityName entity name + * @param startTime start time for process while scheduling + * @param numInstances numInstances of process to be scheduled + * @param cluster cluster on which process to be scheduled + * @return boolean + */ + public APIResult schedule(EntityType entityType, String entityName, String startTime, int numInstances, + String cluster) throws FalconCLIException { + try { + FalconUnitHelper.checkSchedulableEntity(entityType.toString()); + Entity entity = EntityUtil.getEntity(entityType, entityName); + boolean clusterPresent = checkAndUpdateCluster(entity, entityType, cluster); + if (!clusterPresent) { + LOG.warn("Cluster is not registered with this entity " + entityName); + return new APIResult(APIResult.Status.FAILED, entity + "Cluster is not registered with this entity " + + entityName); + } + if (StringUtils.isNotEmpty(startTime) && entityType == EntityType.PROCESS) { + updateStartAndEndTime((Process) entity, startTime, numInstances, cluster); + } + workflowEngine.schedule(entity); + LOG.info(entityName + " is scheduled successfully"); + return new APIResult(APIResult.Status.SUCCEEDED, entity + "(" + "PROCESS" + ") scheduled successfully"); + } catch (FalconException e) { + throw new FalconCLIException("FAILED", e); + } + } + + /** + * Instance status for a given nominalTime. + * + * @param entityType entity type + * @param entityName entity name + * @param nominalTime nominal time of process + * @return InstancesResult.WorkflowStatus + */ + public InstancesResult.WorkflowStatus getInstanceStatus(EntityType entityType, String entityName, + String nominalTime) throws Exception { + if (entityType == EntityType.CLUSTER) { + throw new IllegalArgumentException("Instance management functions don't apply to Cluster entities"); + } + Entity entityObject = EntityUtil.getEntity(entityType, entityName); + Date startTime = SchemaHelper.parseDateUTC(nominalTime); + Date endTime = DateUtil.getNextMinute(startTime); + List<LifeCycle> lifeCycles = FalconUnitHelper.checkAndUpdateLifeCycle(null, entityType.name()); + InstancesResult instancesResult = workflowEngine.getStatus(entityObject, startTime, endTime, lifeCycles); + if (instancesResult.getInstances() != null && instancesResult.getInstances().length > 0 + && instancesResult.getInstances()[0] != null) { + LOG.info("Instance status is " + instancesResult.getInstances()[0].getStatus()); + return instancesResult.getInstances()[0].getStatus(); + } + return null; + } + + private boolean checkAndUpdateCluster(Entity entity, EntityType entityType, String cluster) { + if (entityType == EntityType.FEED) { + return checkAndUpdateFeedClusters(entity, cluster); + } else if (entityType == EntityType.PROCESS) { + return checkAndUpdateProcessClusters(entity, cluster); + } else { + throw new IllegalArgumentException("entity type {} is not supported " + entityType); + } + } + + private boolean checkAndUpdateProcessClusters(Entity entity, String cluster) { + Process processEntity = (Process) entity; + List<Cluster> clusters = processEntity.getClusters().getClusters(); + List<Cluster> newClusters = new ArrayList<>(); + if (clusters != null) { + for (Cluster processCluster : clusters) { + if (processCluster.getName().equalsIgnoreCase(cluster)) { + newClusters.add(processCluster); + } + } + } + if (newClusters.isEmpty()) { + LOG.warn("Cluster is not registered with this entity " + entity.getName()); + return false; + } + processEntity.getClusters().getClusters().removeAll(clusters); + processEntity.getClusters().getClusters().addAll(newClusters); + return true; + } + + private boolean checkAndUpdateFeedClusters(Entity entity, String cluster) { + Feed feedEntity = (Feed) entity; + List<org.apache.falcon.entity.v0.feed.Cluster> clusters = feedEntity.getClusters().getClusters(); + List<org.apache.falcon.entity.v0.feed.Cluster> newClusters = new ArrayList<>(); + if (clusters != null) { + for (org.apache.falcon.entity.v0.feed.Cluster feedClusters : clusters) { + if (feedClusters.getName().equalsIgnoreCase(cluster)) { + newClusters.add(feedClusters); + } + } + } + if (newClusters.isEmpty()) { + LOG.warn("Cluster is not registered with this entity " + entity.getName()); + return false; + } + feedEntity.getClusters().getClusters().removeAll(clusters); + feedEntity.getClusters().getClusters().addAll(newClusters); + return true; + } + + private void updateStartAndEndTime(Process processEntity, String startTimeStr, int numInstances, String cluster) { + List<Cluster> clusters = processEntity.getClusters().getClusters(); + if (clusters != null) { + for (Cluster processCluster : clusters) { + if (processCluster.getName().equalsIgnoreCase(cluster)) { + Validity validity = new Validity(); + Date startTime = SchemaHelper.parseDateUTC(startTimeStr); + validity.setStart(startTime); + Date endTime = EntityUtil.getNextInstanceTime(startTime, processEntity.getFrequency(), + TimeZone.getTimeZone("UTC"), numInstances); + validity.setEnd(endTime); + processCluster.setValidity(validity); + } + } + } + } +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/unit/src/main/java/org/apache/falcon/unit/FalconUnitHelper.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/FalconUnitHelper.java b/unit/src/main/java/org/apache/falcon/unit/FalconUnitHelper.java new file mode 100644 index 0000000..604a3f9 --- /dev/null +++ b/unit/src/main/java/org/apache/falcon/unit/FalconUnitHelper.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit; + + +import org.apache.falcon.FalconException; +import org.apache.falcon.LifeCycle; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.UnschedulableEntityException; + +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.InputStream; +import java.util.ArrayList; +import java.util.List; + +/** + * Utility class for Falcon Unit. + */ +public final class FalconUnitHelper { + private FalconUnitHelper() { + } + + /** + * Converts a InputStream into FileInputStream. + * + * @param filePath - Path of file to stream + * @return ServletInputStream + * @throws org.apache.falcon.FalconException + */ + public static InputStream getFileInputStream(String filePath) throws FalconException { + if (filePath == null) { + throw new IllegalArgumentException("file path should not be null"); + } + InputStream stream; + try { + stream = new FileInputStream(filePath); + } catch (FileNotFoundException e) { + throw new FalconException("File not found: " + filePath); + } + return stream; + } + + /** + * Updates lifecycle based on entity. + * + * @param lifeCycleValues + * @param type entity type + * @return list of lifecycle values after check and update + */ + public static List<LifeCycle> checkAndUpdateLifeCycle(List<LifeCycle> lifeCycleValues, + String type) throws FalconException { + EntityType entityType = EntityType.getEnum(type); + if (lifeCycleValues == null || lifeCycleValues.isEmpty()) { + List<LifeCycle> lifeCycles = new ArrayList<LifeCycle>(); + if (entityType == EntityType.PROCESS) { + lifeCycles.add(LifeCycle.valueOf(LifeCycle.EXECUTION.name())); + } else if (entityType == EntityType.FEED) { + lifeCycles.add(LifeCycle.valueOf(LifeCycle.REPLICATION.name())); + } + return lifeCycles; + } + for (LifeCycle lifeCycle : lifeCycleValues) { + if (entityType != lifeCycle.getTag().getType()) { + throw new FalconException("Incorrect lifecycle: " + lifeCycle + "for given type: " + type); + } + } + return lifeCycleValues; + } + + /** + * Checks entity is schedulable or not. + * + * @param type + * @throws UnschedulableEntityException + */ + public static void checkSchedulableEntity(String type) throws UnschedulableEntityException { + EntityType entityType = EntityType.getEnum(type); + if (!entityType.isSchedulable()) { + throw new UnschedulableEntityException( + "Entity type (" + type + ") " + " cannot be Scheduled/Suspended/Resumed"); + } + } +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/unit/src/main/java/org/apache/falcon/unit/LocalFalconClientProtocolProvider.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalFalconClientProtocolProvider.java b/unit/src/main/java/org/apache/falcon/unit/LocalFalconClientProtocolProvider.java new file mode 100644 index 0000000..060b662 --- /dev/null +++ b/unit/src/main/java/org/apache/falcon/unit/LocalFalconClientProtocolProvider.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.unit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.LocalJobRunner; +import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; + +import java.io.IOException; +import java.net.InetSocketAddress; + +/** + * Local ClientProtocol provider for Hadoop. + */ +public class LocalFalconClientProtocolProvider extends ClientProtocolProvider { + + private LocalJobRunner localJobRunner = null; + private static final String UNIT = "unit"; + + @Override + public ClientProtocol create(Configuration conf) throws IOException { + String framework = conf.get(MRConfig.FRAMEWORK_NAME, UNIT); + if (!UNIT.equals(framework)) { + return null; + } + return getLocalJobRunner(conf); + } + + @Override + public ClientProtocol create(InetSocketAddress inetSocketAddress, Configuration conf) throws IOException { + return create(conf); + } + + @Override + public void close(ClientProtocol clientProtocol) throws IOException { + + } + + private synchronized LocalJobRunner getLocalJobRunner(Configuration conf) throws IOException { + if (localJobRunner == null) { + localJobRunner = new LocalJobRunner(conf); + } + return localJobRunner; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/unit/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider b/unit/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider new file mode 100644 index 0000000..2891352 --- /dev/null +++ b/unit/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + org.apache.falcon.unit.LocalFalconClientProtocolProvider \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/8cdac2bb/unit/src/main/resources/core-site.xml ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/core-site.xml b/unit/src/main/resources/core-site.xml new file mode 100644 index 0000000..fd8550f --- /dev/null +++ b/unit/src/main/resources/core-site.xml @@ -0,0 +1,38 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<configuration> + + <property> + <name>fs.fsext.impl</name> + <value>org.apache.falcon.hadoop.FileSystemExtension</value> + </property> + + <property> + <name>fs.defaultFS</name> + <value>jail://global:00</value> + </property> + + <property> + <name>fs.jail.impl</name> + <value>org.apache.falcon.hadoop.JailedFileSystem</value> + </property> + + +</configuration>
