Removing addons/ non-docs directory from asf-site branch
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/6f5b476c Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/6f5b476c Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/6f5b476c Branch: refs/heads/asf-site Commit: 6f5b476ccd8fa4ff1e9aea36d44a85309a9b932e Parents: 8609ffd Author: Pallavi Rao <[email protected]> Authored: Tue Mar 1 12:54:02 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Tue Mar 1 12:54:02 2016 +0530 ---------------------------------------------------------------------- addons/adf/README | 59 -- addons/adf/pom.xml | 112 --- .../apache/falcon/adfservice/ADFHiveJob.java | 123 ---- .../org/apache/falcon/adfservice/ADFJob.java | 556 --------------- .../apache/falcon/adfservice/ADFJobFactory.java | 43 -- .../org/apache/falcon/adfservice/ADFPigJob.java | 70 -- .../falcon/adfservice/ADFProviderService.java | 370 ---------- .../falcon/adfservice/ADFReplicationJob.java | 71 -- .../falcon/adfservice/ADFScheduledExecutor.java | 71 -- .../org/apache/falcon/adfservice/DataFeed.java | 110 --- .../java/org/apache/falcon/adfservice/Feed.java | 39 - .../org/apache/falcon/adfservice/Process.java | 148 ---- .../org/apache/falcon/adfservice/TableFeed.java | 125 ---- .../adfservice/util/ADFJsonConstants.java | 73 -- .../apache/falcon/adfservice/util/FSUtils.java | 102 --- addons/designer/actions/pom.xml | 46 -- .../configuration/EmailActionConfiguration.java | 74 -- .../designer/primitive/action/EmailAction.java | 92 --- addons/designer/checkstyle/pom.xml | 28 - .../resources/falcon/checkstyle-java-header.txt | 17 - .../resources/falcon/checkstyle-noframes.xsl | 218 ------ .../src/main/resources/falcon/checkstyle.xml | 233 ------ .../main/resources/falcon/findbugs-exclude.xml | 34 - addons/designer/common/pom.xml | 42 -- addons/designer/core/pom.xml | 81 --- .../configuration/ActionConfiguration.java | 32 - .../designer/configuration/Configuration.java | 81 --- .../designer/configuration/FlowConfig.java | 69 -- .../designer/configuration/SerdeException.java | 61 -- .../configuration/TransformConfiguration.java | 33 - .../falcon/designer/primitive/Action.java | 102 --- .../apache/falcon/designer/primitive/Code.java | 27 - .../primitive/CompilationException.java | 60 -- .../falcon/designer/primitive/Message.java | 67 -- .../falcon/designer/primitive/Primitive.java | 159 ----- .../falcon/designer/primitive/Transform.java | 88 --- .../falcon/designer/schema/RelationalData.java | 53 -- .../designer/schema/RelationalSchema.java | 84 --- .../falcon/designer/source/DataSource.java | 29 - .../apache/falcon/designer/storage/Storage.java | 67 -- .../designer/storage/StorageException.java | 63 -- .../falcon/designer/storage/Storeable.java | 52 -- .../apache/falcon/designer/storage/Version.java | 71 -- .../designer/storage/VersionedStorage.java | 111 --- .../designer/storage/impl/HDFSStorage.java | 98 --- .../designer/storage/impl/HDFSStorageTest.java | 78 -- addons/designer/flows/pom.xml | 46 -- .../apache/falcon/designer/primitive/Flow.java | 83 --- addons/designer/pom.xml | 709 ------------------- addons/designer/transforms/pom.xml | 42 -- addons/designer/ui/pom.xml | 95 --- .../designer/ui/src/main/webapp/WEB-INF/web.xml | 49 -- addons/hivedr/README | 80 --- addons/hivedr/pom.xml | 209 ------ .../apache/falcon/hive/DefaultPartitioner.java | 317 --------- .../org/apache/falcon/hive/EventSourcer.java | 31 - .../java/org/apache/falcon/hive/HiveDRArgs.java | 122 ---- .../org/apache/falcon/hive/HiveDROptions.java | 183 ----- .../java/org/apache/falcon/hive/HiveDRTool.java | 393 ---------- .../falcon/hive/LastReplicatedEvents.java | 196 ----- .../falcon/hive/MetaStoreEventSourcer.java | 204 ------ .../org/apache/falcon/hive/Partitioner.java | 42 -- .../falcon/hive/ReplicationEventMetadata.java | 34 - .../exception/HiveReplicationException.java | 49 -- .../falcon/hive/mapreduce/CopyCommitter.java | 65 -- .../falcon/hive/mapreduce/CopyMapper.java | 104 --- .../falcon/hive/mapreduce/CopyReducer.java | 85 --- .../falcon/hive/util/DBReplicationStatus.java | 213 ------ .../apache/falcon/hive/util/DRStatusStore.java | 104 --- .../apache/falcon/hive/util/DelimiterUtils.java | 30 - .../falcon/hive/util/EventSourcerUtils.java | 189 ----- .../org/apache/falcon/hive/util/EventUtils.java | 393 ---------- .../org/apache/falcon/hive/util/FileUtils.java | 68 -- .../falcon/hive/util/HiveDRStatusStore.java | 315 -------- .../apache/falcon/hive/util/HiveDRUtils.java | 99 --- .../falcon/hive/util/HiveMetastoreUtils.java | 92 --- .../falcon/hive/util/ReplicationStatus.java | 221 ------ addons/hivedr/src/main/resources/log4j.xml | 54 -- .../falcon/hive/DBReplicationStatusTest.java | 230 ------ .../java/org/apache/falcon/hive/DRTest.java | 45 -- .../falcon/hive/HiveDRStatusStoreTest.java | 343 --------- .../java/org/apache/falcon/hive/HiveDRTest.java | 252 ------- .../falcon/hive/ReplicationStatusTest.java | 137 ---- addons/recipes/hdfs-replication/README.txt | 29 - addons/recipes/hdfs-replication/pom.xml | 32 - .../resources/hdfs-replication-template.xml | 44 -- .../resources/hdfs-replication-workflow.xml | 82 --- .../main/resources/hdfs-replication.properties | 79 --- .../recipes/hive-disaster-recovery/README.txt | 58 -- addons/recipes/hive-disaster-recovery/pom.xml | 32 - .../hive-disaster-recovery-secure-template.xml | 45 -- .../hive-disaster-recovery-secure-workflow.xml | 357 ---------- .../hive-disaster-recovery-secure.properties | 108 --- .../hive-disaster-recovery-template.xml | 45 -- .../hive-disaster-recovery-workflow.xml | 249 ------- .../resources/hive-disaster-recovery.properties | 98 --- 96 files changed, 11603 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/README ---------------------------------------------------------------------- diff --git a/addons/adf/README b/addons/adf/README deleted file mode 100644 index 39883b8..0000000 --- a/addons/adf/README +++ /dev/null @@ -1,59 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -ADF Provider -======================= - - -Overview ---------- - -This integration allows Microsoft Azure Data Factory pipelines to invoke Falcon activities -(i.e. replication, hive and pig proessing work), so the user can build a hybrid Hadoop data pipelines -leveraging on-premises Hadoop clusters and cloud based Cortana Analytics services -like HDInsight Hadoop clusters and Azure Machine Learning. - - -Usage ---------- - -Falcon reads Azure Service Bus credentials from conf/startup.properties when it starts. -So, the credential needs to be added before starting Falcon, -and Falcon needs to be restarted if there is any change in the credential. - -Example: - -######### ADF Configurations start ######### - -# A String object that represents the namespace -*.microsoft.windowsazure.services.servicebus.namespace=hwtransport - -# Request and status queues on the namespace -*.microsoft.windowsazure.services.servicebus.requestqueuename=adfrequest -*.microsoft.windowsazure.services.servicebus.statusqueuename=adfstatus - -# A String object that contains the SAS key name -*.microsoft.windowsazure.services.servicebus.sasKeyName=RootManageSharedAccessKey - -# A String object that contains the SAS key -*.microsoft.windowsazure.services.servicebus.sasKey=4kt2x6yEoWZZSFZofyXEoxly0knHL7FPMqLD14ov1jo= - -# A String object containing the base URI that is added to your Service Bus namespace to form the URI to connect -# to the Service Bus service. To access the default public Azure service, pass ".servicebus.windows.net" -*.microsoft.windowsazure.services.servicebus.serviceBusRootUri=.servicebus.windows.net - -# Service bus polling frequency (in seconds) -*.microsoft.windowsazure.services.servicebus.polling.frequency=60 http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/pom.xml ---------------------------------------------------------------------- diff --git a/addons/adf/pom.xml b/addons/adf/pom.xml deleted file mode 100644 index 898791e..0000000 --- a/addons/adf/pom.xml +++ /dev/null @@ -1,112 +0,0 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - Licensed to the Apache Software Foundation (ASF) under one - or more contributor license agreements. See the NOTICE file - distributed with this work for additional information - regarding copyright ownership. The ASF licenses this file - to you under the Apache License, Version 2.0 (the - "License"); you may not use this file except in compliance - with the License. You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - - Unless required by applicable law or agreed to in writing, software - distributed under the License is distributed on an "AS IS" BASIS, - WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - See the License for the specific language governing permissions and - limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.falcon</groupId> - <artifactId>falcon-main</artifactId> - <version>0.10-SNAPSHOT</version> - <relativePath>../../pom.xml</relativePath> - </parent> - <artifactId>falcon-adf</artifactId> - <description>Apache Falcon ADF Integration</description> - <name>Apache Falcon ADF Integration</name> - <packaging>jar</packaging> - - <properties> - <azure.version>0.8.0</azure.version> - </properties> - - <dependencies> - <dependency> - <groupId>org.apache.falcon</groupId> - <artifactId>falcon-common</artifactId> - <exclusions> - <exclusion> - <groupId>javax.servlet.jsp</groupId> - <artifactId>jsp-api</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.falcon</groupId> - <artifactId>falcon-prism</artifactId> - <version>${project.version}</version> - <classifier>classes</classifier> - </dependency> - - <dependency> - <groupId>com.microsoft.azure</groupId> - <artifactId>azure-servicebus</artifactId> - <version>${azure.version}</version> - <exclusions> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-core-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.codehaus.jackson</groupId> - <artifactId>jackson-mapper-asl</artifactId> - </exclusion> - <exclusion> - <groupId>org.apache.httpcomponents</groupId> - <artifactId>httpclient</artifactId> - </exclusion> - </exclusions> - </dependency> - </dependencies> - - <profiles> - <profile> - <id>hadoop-2</id> - <activation> - <activeByDefault>true</activeByDefault> - </activation> - <dependencies> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </dependency> - </dependencies> - </profile> - </profiles> - - <build> - <sourceDirectory>${basedir}/src/main/java</sourceDirectory> - <!--<testSourceDirectory>${basedir}/src/test/java</testSourceDirectory>--> - <plugins> - <plugin> - <groupId>org.apache.maven.plugins</groupId> - <artifactId>maven-jar-plugin</artifactId> - <executions> - <execution> - <goals> - <goal>test-jar</goal> - </goals> - </execution> - </executions> - </plugin> - </plugins> - </build> - -</project> http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java ---------------------------------------------------------------------- diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java deleted file mode 100644 index 6412c73..0000000 --- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFHiveJob.java +++ /dev/null @@ -1,123 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.adfservice; - -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.adfservice.util.ADFJsonConstants; -import org.apache.falcon.FalconException; -import org.json.JSONException; -import org.json.JSONObject; - -/** - * Azure ADF Hive Job. - */ -public class ADFHiveJob extends ADFJob { - private static final String HIVE_SCRIPT_EXTENSION = ".hql"; - private static final String ENGINE_TYPE = "hive"; - private static final String INPUT_FEED_SUFFIX = "-hive-input-feed"; - private static final String OUTPUT_FEED_SUFFIX = "-hive-output-feed"; - - private String hiveScriptPath; - private TableFeed inputFeed; - private TableFeed outputFeed; - - public ADFHiveJob(String message, String id) throws FalconException { - super(message, id); - type = JobType.HIVE; - inputFeed = getInputTableFeed(); - outputFeed = getOutputTableFeed(); - hiveScriptPath = activityHasScriptPath() ? getScriptPath() : createScriptFile(HIVE_SCRIPT_EXTENSION); - } - - @Override - public void startJob() throws FalconException { - startProcess(inputFeed, outputFeed, ENGINE_TYPE, hiveScriptPath); - } - - @Override - public void cleanup() throws FalconException { - cleanupProcess(inputFeed, outputFeed); - } - - private TableFeed getInputTableFeed() throws FalconException { - return getTableFeed(jobEntityName() + INPUT_FEED_SUFFIX, getInputTables().get(0), - getTableCluster(getInputTables().get(0))); - } - - private TableFeed getOutputTableFeed() throws FalconException { - return getTableFeed(jobEntityName() + OUTPUT_FEED_SUFFIX, getOutputTables().get(0), - getTableCluster(getOutputTables().get(0))); - } - - private TableFeed getTableFeed(final String feedName, final String tableName, - final String clusterName) throws FalconException { - JSONObject tableExtendedProperties = getTableExtendedProperties(tableName); - String tableFeedName; - String partitions; - - try { - tableFeedName = tableExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_TABLE_NAME); - if (StringUtils.isBlank(tableFeedName)) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TABLE_NAME + " cannot" - + " be empty in ADF request."); - } - partitions = tableExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_TABLE_PARTITION); - if (StringUtils.isBlank(partitions)) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TABLE_PARTITION + " cannot" - + " be empty in ADF request."); - } - } catch (JSONException e) { - throw new FalconException("Error while parsing ADF JSON message: " + tableExtendedProperties, e); - } - - return new TableFeed.Builder().withFeedName(feedName).withFrequency(frequency) - .withClusterName(clusterName).withStartTime(startTime).withEndTime(endTime). - withAclOwner(proxyUser).withTableName(tableFeedName).withPartitions(partitions).build(); - } - - private JSONObject getTableExtendedProperties(final String tableName) throws FalconException { - JSONObject table = tablesMap.get(tableName); - if (table == null) { - throw new FalconException("JSON object table " + tableName + " not found in ADF request."); - } - - try { - JSONObject tableProperties = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES); - if (tableProperties == null) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_PROPERTIES - + " not found in ADF request."); - } - JSONObject tablesLocation = tableProperties.getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION); - if (tablesLocation == null) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_LOCATION - + " not found in ADF request."); - } - - JSONObject tableExtendedProperties = tablesLocation.getJSONObject(ADFJsonConstants. - ADF_REQUEST_EXTENDED_PROPERTIES); - if (tableExtendedProperties == null) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES - + " not found in ADF request."); - } - return tableExtendedProperties; - } catch (JSONException e) { - throw new FalconException("Error while parsing ADF JSON message: " + table, e); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java ---------------------------------------------------------------------- diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java deleted file mode 100644 index 5d81338..0000000 --- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJob.java +++ /dev/null @@ -1,556 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.adfservice; - -import java.io.InputStream; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.regex.Pattern; - -import org.apache.commons.lang3.StringUtils; -import org.apache.commons.io.IOUtils; -import org.apache.falcon.adfservice.util.ADFJsonConstants; -import org.apache.falcon.adfservice.util.FSUtils; -import org.apache.falcon.entity.EntityUtil; -import org.apache.falcon.entity.v0.Entity; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.FalconException; -import org.apache.falcon.resource.AbstractSchedulableEntityManager; -import org.apache.falcon.security.CurrentUser; -import org.apache.hadoop.fs.Path; -import org.json.JSONArray; -import org.json.JSONException; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Base class for Azure ADF jobs. - */ -public abstract class ADFJob { - private static final Logger LOG = LoggerFactory.getLogger(ADFJob.class); - - // name prefix for all adf related entity, e.g. an adf hive process and the feeds associated with it - public static final String ADF_ENTITY_NAME_PREFIX = "ADF-"; - public static final int ADF_ENTITY_NAME_PREFIX_LENGTH = ADF_ENTITY_NAME_PREFIX.length(); - // name prefix for all adf related job entity, i.e. adf hive/pig process and replication feed - public static final String ADF_JOB_ENTITY_NAME_PREFIX = ADF_ENTITY_NAME_PREFIX + "JOB-"; - public static final int ADF_JOB_ENTITY_NAME_PREFIX_LENGTH = ADF_JOB_ENTITY_NAME_PREFIX.length(); - - public static final String TEMPLATE_PATH_PREFIX = "/apps/falcon/adf/"; - public static final String PROCESS_SCRIPTS_PATH = TEMPLATE_PATH_PREFIX - + Path.SEPARATOR + "generatedscripts"; - private static final String DEFAULT_FREQUENCY = "days(1)"; - - public static boolean isADFJobEntity(String entityName) { - return entityName.startsWith(ADF_JOB_ENTITY_NAME_PREFIX); - } - - public static String getSessionID(String entityName) throws FalconException { - if (!isADFJobEntity(entityName)) { - throw new FalconException("The entity, " + entityName + ", is not an ADF Job Entity."); - } - return entityName.substring(ADF_JOB_ENTITY_NAME_PREFIX_LENGTH); - } - - /** - * Enum for job type. - */ - public static enum JobType { - HIVE, PIG, REPLICATION - } - - private static enum RequestType { - HADOOPMIRROR, HADOOPHIVE, HADOOPPIG - } - - public static JobType getJobType(String msg) throws FalconException { - try { - JSONObject obj = new JSONObject(msg); - JSONObject activity = obj.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY); - if (activity == null) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_ACTIVITY + " not found in ADF" - + " request."); - } - - JSONObject activityProperties = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_TRANSFORMATION); - if (activityProperties == null) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TRANSFORMATION + " not found " - + "in ADF request."); - } - - String type = activityProperties.getString(ADFJsonConstants.ADF_REQUEST_TYPE); - if (StringUtils.isBlank(type)) { - throw new FalconException(ADFJsonConstants.ADF_REQUEST_TYPE + " not found in ADF request msg"); - } - - switch (RequestType.valueOf(type.toUpperCase())) { - case HADOOPMIRROR: - return JobType.REPLICATION; - case HADOOPHIVE: - return JobType.HIVE; - case HADOOPPIG: - return JobType.PIG; - default: - throw new FalconException("Unrecognized ADF job type: " + type); - } - } catch (JSONException e) { - throw new FalconException("Error while parsing ADF JSON message: " + msg, e); - } - } - - public abstract void startJob() throws FalconException; - public abstract void cleanup() throws FalconException; - - protected JSONObject message; - protected JSONObject activity; - protected JSONObject activityExtendedProperties; - protected String id; - protected JobType type; - protected String startTime, endTime; - protected String frequency; - protected String proxyUser; - protected long timeout; - protected ADFJobManager jobManager = new ADFJobManager(); - - private Map<String, JSONObject> linkedServicesMap = new HashMap<String, JSONObject>(); - protected Map<String, JSONObject> tablesMap = new HashMap<String, JSONObject>(); - - public ADFJob(String msg, String id) throws FalconException { - this.id = id; - FSUtils.createDir(new Path(PROCESS_SCRIPTS_PATH)); - try { - message = new JSONObject(msg); - - frequency = DEFAULT_FREQUENCY; - startTime = transformTimeFormat(message.getString(ADFJsonConstants.ADF_REQUEST_START_TIME)); - endTime = transformTimeFormat(message.getString(ADFJsonConstants.ADF_REQUEST_END_TIME)); - - JSONArray linkedServices = message.getJSONArray(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICES); - for (int i = 0; i < linkedServices.length(); i++) { - JSONObject linkedService = linkedServices.getJSONObject(i); - linkedServicesMap.put(linkedService.getString(ADFJsonConstants.ADF_REQUEST_NAME), linkedService); - } - - JSONArray tables = message.getJSONArray(ADFJsonConstants.ADF_REQUEST_TABLES); - for (int i = 0; i < tables.length(); i++) { - JSONObject table = tables.getJSONObject(i); - tablesMap.put(table.getString(ADFJsonConstants.ADF_REQUEST_NAME), table); - } - - // Set the activity extended properties - activity = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY); - if (activity == null) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_ACTIVITY + " not found in ADF" - + " request."); - } - - JSONObject policy = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_POLICY); - /* IS policy mandatory */ - if (policy == null) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_POLICY + " not found" - + " in ADF request."); - } - String adfTimeout = policy.getString(ADFJsonConstants.ADF_REQUEST_TIMEOUT); - if (StringUtils.isBlank(adfTimeout)) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TIMEOUT + " not found" - + " in ADF request."); - } - timeout = parseADFRequestTimeout(adfTimeout); - - JSONObject activityProperties = activity.getJSONObject(ADFJsonConstants.ADF_REQUEST_TRANSFORMATION); - if (activityProperties == null) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_TRANSFORMATION + " not found" - + " in ADF request."); - } - - activityExtendedProperties = activityProperties.getJSONObject( - ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES); - if (activityExtendedProperties == null) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES + " not" - + " found in ADF request."); - } - - // should be called after setting activityExtendedProperties - proxyUser = getRunAsUser(); - - // log in the user - CurrentUser.authenticate(proxyUser); - } catch (JSONException e) { - throw new FalconException("Error while parsing ADF JSON message: " + msg, e); - } - } - - public String jobEntityName() { - return ADF_JOB_ENTITY_NAME_PREFIX + id; - } - - public JobType jobType() { - return type; - } - - protected String getClusterName(String linkedServiceName) throws FalconException { - JSONObject linkedService = linkedServicesMap.get(linkedServiceName); - if (linkedService == null) { - throw new FalconException("Linked service " + linkedServiceName + " not found in ADF request."); - } - - try { - return linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES) - .getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES) - .getString(ADFJsonConstants.ADF_REQUEST_CLUSTER_NAME); - } catch (JSONException e) { - throw new FalconException("Error while parsing linked service " + linkedServiceName + " in ADF request."); - } - } - - protected String getRunAsUser() throws FalconException { - if (activityExtendedProperties.has(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER)) { - String runAsUser = null; - try { - runAsUser = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER); - } catch (JSONException e) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " not" - + " found in ADF request."); - } - - if (StringUtils.isBlank(runAsUser)) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " in" - + " ADF request activity extended properties cannot be empty."); - } - return runAsUser; - } else { - String hadoopLinkedService = getHadoopLinkedService(); - JSONObject linkedService = linkedServicesMap.get(hadoopLinkedService); - if (linkedService == null) { - throw new FalconException("JSON object " + hadoopLinkedService + " not" - + " found in ADF request."); - } - - try { - return linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES) - .getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES) - .getString(ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER); - } catch (JSONException e) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_RUN_ON_BEHALF_USER + " not" - + " found in ADF request."); - } - } - } - - protected List<String> getInputTables() throws FalconException { - List<String> tables = new ArrayList<String>(); - try { - JSONArray inputs = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY) - .getJSONArray(ADFJsonConstants.ADF_REQUEST_INPUTS); - for (int i = 0; i < inputs.length(); i++) { - tables.add(inputs.getJSONObject(i).getString(ADFJsonConstants.ADF_REQUEST_NAME)); - } - } catch (JSONException e) { - throw new FalconException("Error while reading input table names in ADF request."); - } - return tables; - } - - protected List<String> getOutputTables() throws FalconException { - List<String> tables = new ArrayList<String>(); - try { - JSONArray outputs = message.getJSONObject(ADFJsonConstants.ADF_REQUEST_ACTIVITY) - .getJSONArray(ADFJsonConstants.ADF_REQUEST_OUTPUTS); - for (int i = 0; i < outputs.length(); i++) { - tables.add(outputs.getJSONObject(i).getString(ADFJsonConstants.ADF_REQUEST_NAME)); - } - } catch (JSONException e) { - throw new FalconException("Error while reading output table names in ADF request."); - } - return tables; - } - - protected String getADFTablePath(String tableName) throws FalconException { - JSONObject table = tablesMap.get(tableName); - if (table == null) { - throw new FalconException("JSON object " + tableName + " not" - + " found in ADF request."); - } - - try { - JSONObject location = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES) - .getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION); - String requestType = location.getString(ADFJsonConstants.ADF_REQUEST_TYPE); - if (requestType.equals(ADFJsonConstants.ADF_REQUEST_LOCATION_TYPE_AZURE_BLOB)) { - String blobPath = location.getString(ADFJsonConstants.ADF_REQUEST_FOLDER_PATH); - int index = blobPath.indexOf('/'); - if (index == -1) { - throw new FalconException("Invalid azure blob path: " + blobPath); - } - - String linkedServiceName = location.getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME); - JSONObject linkedService = linkedServicesMap.get(linkedServiceName); - if (linkedService == null) { - throw new FalconException("Can't find linked service " + linkedServiceName + " for azure blob"); - } - String connectionString = linkedService.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES) - .getString(ADFJsonConstants.ADF_REQUEST_CONNECTION_STRING); - int accountNameIndex = connectionString.indexOf(ADFJsonConstants.ADF_REQUEST_BLOB_ACCOUNT_NAME) - + ADFJsonConstants.ADF_REQUEST_BLOB_ACCOUNT_NAME.length(); - String accountName = connectionString.substring(accountNameIndex, - connectionString.indexOf(';', accountNameIndex)); - - StringBuilder blobUrl = new StringBuilder("wasb://") - .append(blobPath.substring(0, index)).append("@") - .append(accountName).append(".blob.core.windows.net") - .append(blobPath.substring(index)); - return blobUrl.toString(); - } - return location.getJSONObject(ADFJsonConstants.ADF_REQUEST_EXTENDED_PROPERTIES) - .getString(ADFJsonConstants.ADF_REQUEST_FOLDER_PATH); - } catch (JSONException e) { - throw new FalconException("Error while parsing ADF JSON message: " + tableName, e); - } - } - - protected String getTableCluster(String tableName) throws FalconException { - JSONObject table = tablesMap.get(tableName); - if (table == null) { - throw new FalconException("Table " + tableName + " not found in ADF request."); - } - - try { - String linkedServiceName = table.getJSONObject(ADFJsonConstants.ADF_REQUEST_PROPERTIES) - .getJSONObject(ADFJsonConstants.ADF_REQUEST_LOCATION) - .getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME); - return getClusterName(linkedServiceName); - } catch (JSONException e) { - throw new FalconException("Error while parsing table cluster " + tableName + " in ADF request."); - } - } - - protected boolean activityHasScriptPath() throws FalconException { - if (JobType.REPLICATION == jobType()) { - return false; - } - - return activityExtendedProperties.has( - ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH); - } - - protected String getScriptPath() throws FalconException { - if (!activityHasScriptPath()) { - throw new FalconException("JSON object does not have object: " - + ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH); - } - - try { - String scriptPath = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH); - if (StringUtils.isBlank(scriptPath)) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_SCRIPT_PATH + " not" - + " found or empty in ADF request."); - } - return scriptPath; - } catch (JSONException jsonException) { - throw new FalconException("Error while parsing ADF JSON object: " - + activityExtendedProperties, jsonException); - } - } - - protected String getScriptContent() throws FalconException { - if (activityHasScriptPath()) { - throw new FalconException("JSON object does not have object: " + ADFJsonConstants.ADF_REQUEST_SCRIPT); - } - try { - String script = activityExtendedProperties.getString(ADFJsonConstants.ADF_REQUEST_SCRIPT); - if (StringUtils.isBlank(script)) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_SCRIPT + " cannot" - + " be empty in ADF request."); - } - return script; - } catch (JSONException jsonException) { - throw new FalconException("Error while parsing ADF JSON object: " - + activityExtendedProperties, jsonException); - } - } - - protected String getClusterNameToRunProcessOn() throws FalconException { - return getClusterName(getHadoopLinkedService()); - } - - protected Entity submitAndScheduleJob(String entityType, String msg) throws FalconException { - Entity entity = jobManager.submitJob(entityType, msg); - jobManager.scheduleJob(entityType, jobEntityName()); - return entity; - } - - private String getHadoopLinkedService() throws FalconException { - String hadoopLinkedService; - try { - hadoopLinkedService = activity.getString(ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME); - } catch (JSONException jsonException) { - throw new FalconException("Error while parsing ADF JSON object: " - + activity, jsonException); - } - - if (StringUtils.isBlank(hadoopLinkedService)) { - throw new FalconException("JSON object " + ADFJsonConstants.ADF_REQUEST_LINKED_SERVICE_NAME - + " in the activity cannot be empty in ADF request."); - } - return hadoopLinkedService; - } - - protected void startProcess(Feed inputFeed, Feed outputFeed, - String engineType, String scriptPath) throws FalconException { - // submit input/output feeds - LOG.info("submitting input feed {} for {} process", inputFeed.getName(), engineType); - jobManager.submitJob(EntityType.FEED.name(), inputFeed.getEntityxml()); - - LOG.info("submitting output feed {} for {} process", outputFeed.getName(), engineType); - jobManager.submitJob(EntityType.FEED.name(), outputFeed.getEntityxml()); - - // submit and schedule process - String processRequest = new Process.Builder().withProcessName(jobEntityName()).withFrequency(frequency) - .withStartTime(startTime).withEndTime(endTime).withClusterName(getClusterNameToRunProcessOn()) - .withInputFeedName(inputFeed.getName()).withOutputFeedName(outputFeed.getName()) - .withEngineType(engineType).withWFPath(scriptPath).withAclOwner(proxyUser) - .build().getEntityxml(); - - LOG.info("submitting/scheduling {} process: {}", engineType, processRequest); - submitAndScheduleJob(EntityType.PROCESS.name(), processRequest); - LOG.info("submitted and scheduled {} process: {}", engineType, jobEntityName()); - } - - protected void cleanupProcess(Feed inputFeed, Feed outputFeed) throws FalconException { - // delete the entities. Should be called after the job execution success/failure. - jobManager.deleteEntity(EntityType.PROCESS.name(), jobEntityName()); - jobManager.deleteEntity(EntityType.FEED.name(), inputFeed.getName()); - jobManager.deleteEntity(EntityType.FEED.name(), outputFeed.getName()); - - // delete script file - FSUtils.removeDir(new Path(ADFJob.PROCESS_SCRIPTS_PATH, jobEntityName())); - } - - protected String createScriptFile(String fileExt) throws FalconException { - String content = getScriptContent(); - - // create dir; dir path is unique as job name is always unique - final Path dir = new Path(ADFJob.PROCESS_SCRIPTS_PATH, jobEntityName()); - FSUtils.createDir(dir); - - // create script file - final Path path = new Path(dir, jobEntityName() + fileExt); - return FSUtils.createFile(path, content); - } - - private static long parseADFRequestTimeout(String timeout) throws FalconException { - timeout = timeout.trim(); - // [ws][-]{ d | d.hh:mm[:ss[.ff]] | hh:mm[:ss[.ff]] }[ws] - if (timeout.startsWith("-")) { - return -1; - } - - long totalMinutes = 0; - String [] dotParts = timeout.split(Pattern.quote(".")); - if (dotParts.length == 1) { - // no d or ff - // chk if only d - // Formats can be d|hh:mm[:ss] - String[] parts = timeout.split(":"); - if (parts.length == 1) { - // only day. Convert days to minutes - return Integer.parseInt(parts[0]) * 1440; - } else { - // hh:mm[:ss] - return computeMinutes(parts); - } - } - - // if . is present, formats can be d.hh:mm[:ss[.ff]] | hh:mm[:ss[.ff]] - if (dotParts.length == 2) { - // can be d.hh:mm[:ss] or hh:mm[:ss[.ff] - // check if first part has colons - String [] parts = dotParts[0].split(":"); - if (parts.length == 1) { - // format is d.hh:mm[:ss] - totalMinutes = Integer.parseInt(dotParts[0]) * 1440; - parts = dotParts[1].split(":"); - totalMinutes += computeMinutes(parts); - return totalMinutes; - } else { - // format is hh:mm[:ss[.ff] - parts = dotParts[0].split(":"); - totalMinutes += computeMinutes(parts); - // round off ff - totalMinutes += 1; - return totalMinutes; - } - } else if (dotParts.length == 3) { - // will be d.hh:mm[:ss[.ff] - totalMinutes = Integer.parseInt(dotParts[0]) * 1440; - String [] parts = dotParts[1].split(":"); - totalMinutes += computeMinutes(parts); - // round off ff - totalMinutes += 1; - return totalMinutes; - } else { - throw new FalconException("Error parsing policy timeout: " + timeout); - } - } - - // format hh:mm[:ss] - private static long computeMinutes(String[] parts) { - // hh:mm[:ss] - int totalMinutes = Integer.parseInt(parts[0]) * 60; - totalMinutes += Integer.parseInt(parts[1]); - if (parts.length == 3) { - // Second round off to minutes - totalMinutes += 1; - } - return totalMinutes; - } - - private static String transformTimeFormat(String adfTime) { - return adfTime.substring(0, adfTime.length()-4) + "Z"; - } - - protected class ADFJobManager extends AbstractSchedulableEntityManager { - public Entity submitJob(String entityType, String msg) throws FalconException { - try { - InputStream stream = IOUtils.toInputStream(msg); - Entity entity = submitInternal(stream, entityType, proxyUser); - return entity; - } catch (Exception e) { - LOG.info(e.toString()); - throw new FalconException("Error when submitting job: " + e.toString()); - } - } - - public void scheduleJob(String entityType, String entityName) throws FalconException { - try { - scheduleInternal(entityType, entityName, null, EntityUtil.getPropertyMap(null)); - } catch (Exception e) { - LOG.info(e.toString()); - throw new FalconException("Error when scheduling job: " + e.toString()); - } - } - - public void deleteEntity(String entityType, String entityName) throws FalconException { - delete(entityType, entityName, null); - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java ---------------------------------------------------------------------- diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java deleted file mode 100644 index ceea6a4..0000000 --- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFJobFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.adfservice; - -import org.apache.falcon.FalconException; - -/** - * Azure ADB Job factory to generate ADFJob for each job type. - */ -public final class ADFJobFactory { - public static ADFJob buildADFJob(String msg, String id) throws FalconException { - ADFJob.JobType jobType = ADFJob.getJobType(msg); - switch (jobType) { - case REPLICATION: - return new ADFReplicationJob(msg, id); - case HIVE: - return new ADFHiveJob(msg, id); - case PIG: - return new ADFPigJob(msg, id); - default: - throw new FalconException("Invalid job type: " + jobType.toString()); - } - } - - private ADFJobFactory() { - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java ---------------------------------------------------------------------- diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java deleted file mode 100644 index 041eb48..0000000 --- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFPigJob.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.adfservice; - -import org.apache.falcon.FalconException; - -/** - * Azure ADF Pig Job. - */ -public class ADFPigJob extends ADFJob { - private static final String PIG_SCRIPT_EXTENSION = ".pig"; - private static final String ENGINE_TYPE = "pig"; - private static final String INPUT_FEED_SUFFIX = "-pig-input-feed"; - private static final String OUTPUT_FEED_SUFFIX = "-pig-output-feed"; - - private String pigScriptPath; - private DataFeed inputDataFeed; - private DataFeed outputDataFeed; - - public ADFPigJob(String message, String id) throws FalconException { - super(message, id); - type = JobType.PIG; - inputDataFeed = getInputFeed(); - outputDataFeed = getOutputFeed(); - pigScriptPath = activityHasScriptPath() ? getScriptPath() : createScriptFile(PIG_SCRIPT_EXTENSION); - } - - @Override - public void startJob() throws FalconException { - startProcess(inputDataFeed, outputDataFeed, ENGINE_TYPE, pigScriptPath); - } - - @Override - public void cleanup() throws FalconException { - cleanupProcess(inputDataFeed, outputDataFeed); - } - - private DataFeed getInputFeed() throws FalconException { - return getFeed(jobEntityName() + INPUT_FEED_SUFFIX, getInputTables().get(0), - getTableCluster(getInputTables().get(0))); - } - - private DataFeed getOutputFeed() throws FalconException { - return getFeed(jobEntityName() + OUTPUT_FEED_SUFFIX, getOutputTables().get(0), - getTableCluster(getOutputTables().get(0))); - } - - private DataFeed getFeed(final String feedName, final String tableName, - final String clusterName) throws FalconException { - return new DataFeed.Builder().withFeedName(feedName).withFrequency(frequency) - .withClusterName(clusterName).withStartTime(startTime).withEndTime(endTime) - .withAclOwner(proxyUser).withLocationPath(getADFTablePath(tableName)).build(); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java ---------------------------------------------------------------------- diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java deleted file mode 100644 index 3438b2f..0000000 --- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFProviderService.java +++ /dev/null @@ -1,370 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.adfservice; - -import com.microsoft.windowsazure.Configuration; -import com.microsoft.windowsazure.exception.ServiceException; -import com.microsoft.windowsazure.services.servicebus.ServiceBusService; -import com.microsoft.windowsazure.services.servicebus.models.BrokeredMessage; -import com.microsoft.windowsazure.services.servicebus.models.ReceiveMessageOptions; -import com.microsoft.windowsazure.services.servicebus.models.ReceiveMode; -import com.microsoft.windowsazure.services.servicebus.models.ReceiveQueueMessageResult; -import com.microsoft.windowsazure.services.servicebus.ServiceBusConfiguration; -import com.microsoft.windowsazure.services.servicebus.ServiceBusContract; - -import java.io.BufferedReader; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.IOException; -import java.util.Collection; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.io.IOUtils; -import org.apache.commons.lang3.StringUtils; -import org.apache.falcon.adfservice.util.ADFJsonConstants; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.store.ConfigurationStore; -import org.apache.falcon.entity.v0.EntityType; -import org.apache.falcon.resource.AbstractInstanceManager; -import org.apache.falcon.resource.InstancesResult; -import org.apache.falcon.resource.InstancesResult.Instance; -import org.apache.falcon.resource.InstancesResult.WorkflowStatus; -import org.apache.falcon.security.CurrentUser; -import org.apache.falcon.service.FalconService; -import org.apache.falcon.service.Services; -import org.apache.falcon.util.StartupProperties; -import org.apache.falcon.workflow.WorkflowExecutionListener; -import org.apache.falcon.workflow.WorkflowExecutionContext; -import org.apache.falcon.workflow.WorkflowJobEndNotificationService; -import org.json.JSONException; -import org.json.JSONObject; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Falcon ADF provider to handle requests from Azure Data Factory. - */ -public class ADFProviderService implements FalconService, WorkflowExecutionListener { - - private static final Logger LOG = LoggerFactory.getLogger(ADFProviderService.class); - - /** - * Constant for the service name. - */ - public static final String SERVICE_NAME = ADFProviderService.class.getSimpleName(); - - private static final int AZURE_SERVICEBUS_RECEIVEMESSGAEOPT_TIMEOUT = 60; - // polling frequency in seconds - private static final int AZURE_SERVICEBUS_DEFAULT_POLLING_FREQUENCY = 10; - - // Number of threads to handle ADF requests - private static final int AZURE_SERVICEBUS_REQUEST_HANDLING_THREADS = 5; - - private static final String AZURE_SERVICEBUS_CONF_PREFIX = "microsoft.windowsazure.services.servicebus."; - private static final String AZURE_SERVICEBUS_CONF_SASKEYNAME = "sasKeyName"; - private static final String AZURE_SERVICEBUS_CONF_SASKEY = "sasKey"; - private static final String AZURE_SERVICEBUS_CONF_SERVICEBUSROOTURI = "serviceBusRootUri"; - private static final String AZURE_SERVICEBUS_CONF_NAMESPACE = "namespace"; - private static final String AZURE_SERVICEBUS_CONF_POLLING_FREQUENCY = "polling.frequency"; - private static final String AZURE_SERVICEBUS_CONF_REQUEST_QUEUE_NAME = "requestqueuename"; - private static final String AZURE_SERVICEBUS_CONF_STATUS_QUEUE_NAME = "statusqueuename"; - private static final String AZURE_SERVICEBUS_CONF_SUPER_USER = "superuser"; - - private static final ConfigurationStore STORE = ConfigurationStore.get(); - - private ServiceBusContract service; - private ScheduledExecutorService adfScheduledExecutorService; - private ReceiveMessageOptions opts = ReceiveMessageOptions.DEFAULT; - private ADFInstanceManager instanceManager = new ADFInstanceManager(); - private String requestQueueName; - private String statusQueueName; - private String superUser; - - @Override - public String getName() { - return SERVICE_NAME; - } - - @Override - public void init() throws FalconException { - // read start up properties for adf configuration - service = ServiceBusService.create(getServiceBusConfig()); - - requestQueueName = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX - + AZURE_SERVICEBUS_CONF_REQUEST_QUEUE_NAME); - if (StringUtils.isBlank(requestQueueName)) { - throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_REQUEST_QUEUE_NAME - + " property not set in startup properties. Please add it."); - } - statusQueueName = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX - + AZURE_SERVICEBUS_CONF_STATUS_QUEUE_NAME); - if (StringUtils.isBlank(statusQueueName)) { - throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_STATUS_QUEUE_NAME - + " property not set in startup properties. Please add it."); - } - - // init opts - opts.setReceiveMode(ReceiveMode.PEEK_LOCK); - opts.setTimeout(AZURE_SERVICEBUS_RECEIVEMESSGAEOPT_TIMEOUT); - - // restart handling - superUser = StartupProperties.get().getProperty( - AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SUPER_USER); - if (StringUtils.isBlank(superUser)) { - throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SUPER_USER - + " property not set in startup properties. Please add it."); - } - CurrentUser.authenticate(superUser); - for (EntityType entityType : EntityType.values()) { - Collection<String> entities = STORE.getEntities(entityType); - for (String entityName : entities) { - updateJobStatus(entityName, entityType.toString()); - } - } - - Services.get().<WorkflowJobEndNotificationService>getService( - WorkflowJobEndNotificationService.SERVICE_NAME).registerListener(this); - adfScheduledExecutorService = new ADFScheduledExecutor(AZURE_SERVICEBUS_REQUEST_HANDLING_THREADS); - adfScheduledExecutorService.scheduleWithFixedDelay(new HandleADFRequests(), 0, getDelay(), TimeUnit.SECONDS); - LOG.info("Falcon ADFProvider service initialized"); - } - - private class HandleADFRequests implements Runnable { - - @Override - public void run() { - String sessionID = null; - try { - LOG.info("To read message from adf..."); - ReceiveQueueMessageResult resultQM = - service.receiveQueueMessage(requestQueueName, opts); - BrokeredMessage message = resultQM.getValue(); - if (message != null && message.getMessageId() != null) { - sessionID = message.getReplyToSessionId(); - BufferedReader rd = new BufferedReader( - new InputStreamReader(message.getBody())); - StringBuilder sb = new StringBuilder(); - String line; - while ((line = rd.readLine()) != null) { - sb.append(line); - } - rd.close(); - String msg = sb.toString(); - LOG.info("ADF message: " + msg); - - service.deleteMessage(message); - - ADFJob job = ADFJobFactory.buildADFJob(msg, sessionID); - job.startJob(); - } else { - LOG.info("No message from adf"); - } - } catch (FalconException e) { - if (sessionID != null) { - sendErrorMessage(sessionID, e.toString()); - } - LOG.info(e.toString()); - } catch (ServiceException | IOException e) { - LOG.info(e.toString()); - } - } - } - - private static Configuration getServiceBusConfig() throws FalconException { - String namespace = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX - + AZURE_SERVICEBUS_CONF_NAMESPACE); - if (StringUtils.isBlank(namespace)) { - throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_NAMESPACE - + " property not set in startup properties. Please add it."); - } - - String sasKeyName = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX - + AZURE_SERVICEBUS_CONF_SASKEYNAME); - if (StringUtils.isBlank(sasKeyName)) { - throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SASKEYNAME - + " property not set in startup properties. Please add it."); - } - - String sasKey = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX - + AZURE_SERVICEBUS_CONF_SASKEY); - if (StringUtils.isBlank(sasKey)) { - throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SASKEY - + " property not set in startup properties. Please add it."); - } - - String serviceBusRootUri = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX - + AZURE_SERVICEBUS_CONF_SERVICEBUSROOTURI); - if (StringUtils.isBlank(serviceBusRootUri)) { - throw new FalconException(AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_SERVICEBUSROOTURI - + " property not set in startup properties. Please add it."); - } - - LOG.info("namespace: {}, sas key name: {}, sas key: {}, root uri: {}", - namespace, sasKeyName, sasKey, serviceBusRootUri); - return ServiceBusConfiguration.configureWithSASAuthentication(namespace, sasKeyName, sasKey, - serviceBusRootUri); - } - - - // gets delay in seconds - private long getDelay() throws FalconException { - String pollingFrequencyValue = StartupProperties.get().getProperty(AZURE_SERVICEBUS_CONF_PREFIX - + AZURE_SERVICEBUS_CONF_POLLING_FREQUENCY); - long pollingFrequency; - try { - pollingFrequency = (StringUtils.isNotEmpty(pollingFrequencyValue)) - ? Long.parseLong(pollingFrequencyValue) : AZURE_SERVICEBUS_DEFAULT_POLLING_FREQUENCY; - } catch (NumberFormatException nfe) { - throw new FalconException("Invalid value provided for startup property " - + AZURE_SERVICEBUS_CONF_PREFIX + AZURE_SERVICEBUS_CONF_POLLING_FREQUENCY - + ", please provide a valid long number", nfe); - } - return pollingFrequency; - } - - @Override - public void destroy() throws FalconException { - Services.get().<WorkflowJobEndNotificationService>getService( - WorkflowJobEndNotificationService.SERVICE_NAME).unregisterListener(this); - adfScheduledExecutorService.shutdown(); - } - - @Override - public void onSuccess(WorkflowExecutionContext context) throws FalconException { - updateJobStatus(context, ADFJsonConstants.ADF_STATUS_SUCCEEDED, 100); - } - - @Override - public void onFailure(WorkflowExecutionContext context) throws FalconException { - updateJobStatus(context, ADFJsonConstants.ADF_STATUS_FAILED, 0); - } - - @Override - public void onStart(WorkflowExecutionContext context) throws FalconException { - updateJobStatus(context, ADFJsonConstants.ADF_STATUS_EXECUTING, 0); - } - - @Override - public void onSuspend(WorkflowExecutionContext context) throws FalconException { - updateJobStatus(context, ADFJsonConstants.ADF_STATUS_CANCELED, 0); - } - - @Override - public void onWait(WorkflowExecutionContext context) throws FalconException { - updateJobStatus(context, ADFJsonConstants.ADF_STATUS_EXECUTING, 0); - } - - private void updateJobStatus(String entityName, String entityType) throws FalconException { - // Filter non-adf jobs - if (!ADFJob.isADFJobEntity(entityName)) { - return; - } - - Instance instance = instanceManager.getFirstInstance(entityName, entityType); - if (instance == null) { - return; - } - - WorkflowStatus workflowStatus = instance.getStatus(); - String status; - int progress = 0; - switch (workflowStatus) { - case SUCCEEDED: - progress = 100; - status = ADFJsonConstants.ADF_STATUS_SUCCEEDED; - break; - case FAILED: - case KILLED: - case ERROR: - case SKIPPED: - case UNDEFINED: - status = ADFJsonConstants.ADF_STATUS_FAILED; - break; - default: - status = ADFJsonConstants.ADF_STATUS_EXECUTING; - } - updateJobStatus(entityName, status, progress, instance.getLogFile()); - } - - private void updateJobStatus(WorkflowExecutionContext context, String status, int progress) { - // Filter non-adf jobs - String entityName = context.getEntityName(); - if (!ADFJob.isADFJobEntity(entityName)) { - return; - } - - updateJobStatus(entityName, status, progress, context.getLogFile()); - } - - private void updateJobStatus(String entityName, String status, int progress, String logUrl) { - try { - String sessionID = ADFJob.getSessionID(entityName); - LOG.info("To update job status: " + sessionID + ", " + entityName + ", " + status + ", " + logUrl); - JSONObject obj = new JSONObject(); - obj.put(ADFJsonConstants.ADF_STATUS_PROTOCOL, ADFJsonConstants.ADF_STATUS_PROTOCOL_NAME); - obj.put(ADFJsonConstants.ADF_STATUS_JOBID, sessionID); - obj.put(ADFJsonConstants.ADF_STATUS_LOG_URL, logUrl); - obj.put(ADFJsonConstants.ADF_STATUS_STATUS, status); - obj.put(ADFJsonConstants.ADF_STATUS_PROGRESS, progress); - sendStatusUpdate(sessionID, obj.toString()); - } catch (JSONException | FalconException e) { - LOG.info("Error when updating job status: " + e.toString()); - } - } - - private void sendErrorMessage(String sessionID, String errorMessage) { - LOG.info("Sending error message for session " + sessionID + ": " + errorMessage); - try { - JSONObject obj = new JSONObject(); - obj.put(ADFJsonConstants.ADF_STATUS_PROTOCOL, ADFJsonConstants.ADF_STATUS_PROTOCOL_NAME); - obj.put(ADFJsonConstants.ADF_STATUS_JOBID, sessionID); - obj.put(ADFJsonConstants.ADF_STATUS_STATUS, ADFJsonConstants.ADF_STATUS_FAILED); - obj.put(ADFJsonConstants.ADF_STATUS_PROGRESS, 0); - obj.put(ADFJsonConstants.ADF_STATUS_ERROR_TYPE, ADFJsonConstants.ADF_STATUS_ERROR_TYPE_VALUE); - obj.put(ADFJsonConstants.ADF_STATUS_ERROR_MESSAGE, errorMessage); - sendStatusUpdate(sessionID, obj.toString()); - } catch (JSONException e) { - LOG.info("Error when sending error message: " + e.toString()); - } - } - - private void sendStatusUpdate(String sessionID, String message) { - LOG.info("Sending update for session " + sessionID + ": " + message); - try { - InputStream in = IOUtils.toInputStream(message, "UTF-8"); - BrokeredMessage updateMessage = new BrokeredMessage(in); - updateMessage.setSessionId(sessionID); - service.sendQueueMessage(statusQueueName, updateMessage); - } catch (IOException | ServiceException e) { - LOG.info("Error when sending status update: " + e.toString()); - } - } - - private static class ADFInstanceManager extends AbstractInstanceManager { - public Instance getFirstInstance(String entityName, String entityType) throws FalconException { - InstancesResult result = getStatus(entityType, entityName, null, null, null, null, "", "", "", 0, 1, null); - Instance[] instances = result.getInstances(); - if (instances.length > 0) { - return instances[0]; - } - return null; - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java ---------------------------------------------------------------------- diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java deleted file mode 100644 index f847a82..0000000 --- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFReplicationJob.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.adfservice; - -import java.net.URISyntaxException; - -import org.apache.falcon.adfservice.util.FSUtils; -import org.apache.falcon.FalconException; -import org.apache.falcon.entity.v0.EntityType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Azure ADF Replication Job (hive/hdfs to Azure blobs). - */ -public class ADFReplicationJob extends ADFJob { - - private static final Logger LOG = LoggerFactory.getLogger(ADFReplicationJob.class); - - public static final String TEMPLATE_REPLICATION_FEED = "replicate-feed.xml"; - public static final String REPLICATION_TARGET_CLUSTER = "adf-replication-target-cluster"; - - public ADFReplicationJob(String message, String id) throws FalconException { - super(message, id); - type = JobType.REPLICATION; - } - - @Override - public void startJob() throws FalconException { - try { - // Note: in first clickstop, we support only one input table and one output table for replication job - String inputTableName = getInputTables().get(0); - String outputTableName = getOutputTables().get(0); - String template = FSUtils.readHDFSFile(TEMPLATE_PATH_PREFIX, TEMPLATE_REPLICATION_FEED); - String message = template.replace("$feedName$", jobEntityName()) - .replace("$frequency$", frequency) - .replace("$startTime$", startTime) - .replace("$endTime$", endTime) - .replace("$clusterSource$", getTableCluster(inputTableName)) - .replace("$clusterTarget$", REPLICATION_TARGET_CLUSTER) - .replace("$sourceLocation$", getADFTablePath(inputTableName)) - .replace("$targetLocation$", getADFTablePath(outputTableName)); - submitAndScheduleJob(EntityType.FEED.name(), message); - } catch (URISyntaxException e) { - LOG.info(e.toString()); - } - - } - - @Override - public void cleanup() throws FalconException { - // Delete the entities. Should be called after the job execution success/failure. - jobManager.deleteEntity(EntityType.FEED.name(), jobEntityName()); - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java ---------------------------------------------------------------------- diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java deleted file mode 100644 index df5a993..0000000 --- a/addons/adf/src/main/java/org/apache/falcon/adfservice/ADFScheduledExecutor.java +++ /dev/null @@ -1,71 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.adfservice; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -/** - * ADF thread pool executor. - */ -public class ADFScheduledExecutor extends ScheduledThreadPoolExecutor { - - private static final Logger LOG = LoggerFactory.getLogger(ADFScheduledExecutor.class); - - public ADFScheduledExecutor(int corePoolSize) { - super(corePoolSize); - } - - @Override - public ScheduledFuture scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) { - return super.scheduleAtFixedRate(wrapRunnable(command), initialDelay, period, unit); - } - - @Override - public ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { - return super.scheduleWithFixedDelay(wrapRunnable(command), initialDelay, delay, unit); - } - - private Runnable wrapRunnable(Runnable command) { - return new LogOnExceptionRunnable(command); - } - - private static class LogOnExceptionRunnable implements Runnable { - private Runnable runnable; - - public LogOnExceptionRunnable(Runnable runnable) { - super(); - this.runnable = runnable; - } - - @Override - public void run() { - try { - runnable.run(); - } catch (Throwable t) { - LOG.info("Error while executing: {}", t.getMessage()); - throw new RuntimeException(t); - } - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java ---------------------------------------------------------------------- diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java deleted file mode 100644 index 32d2757..0000000 --- a/addons/adf/src/main/java/org/apache/falcon/adfservice/DataFeed.java +++ /dev/null @@ -1,110 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.adfservice; - -import org.apache.falcon.adfservice.util.FSUtils; -import org.apache.falcon.FalconException; - -import java.net.URISyntaxException; - -/** - * Class for data Feed. - */ -public class DataFeed extends Feed { - private static final String FEED_TEMPLATE_FILE = "feed.xml"; - private String locationPath; - - public DataFeed(final Builder builder) { - this.feedName = builder.name; - this.clusterName = builder.feedClusterName; - this.frequency = builder.feedFrequency; - this.startTime = builder.feedStartTime; - this.endTime = builder.feedEndTime; - this.locationPath = builder.feedLocationPath; - this.aclOwner = builder.feedAclOwner; - } - - @Override - public String getEntityxml() throws FalconException { - try { - String template = FSUtils.readHDFSFile(ADFJob.TEMPLATE_PATH_PREFIX, FEED_TEMPLATE_FILE); - return template.replace("$feedName$", feedName) - .replace("$frequency$", frequency) - .replace("$startTime$", startTime) - .replace("$endTime$", endTime) - .replace("$cluster$", clusterName) - .replace("$location$", locationPath) - .replace("$aclowner$", aclOwner); - } catch (URISyntaxException e) { - throw new FalconException("Error when generating entity xml for table feed", e); - } - } - - /** - * Builder for table Feed. - */ - public static class Builder { - private String name; - private String feedClusterName; - private String feedFrequency; - private String feedStartTime; - private String feedEndTime; - private String feedLocationPath; - private String feedAclOwner; - - public DataFeed build() { - return new DataFeed(this); - } - - public Builder withFeedName(final String feedName) { - this.name = feedName; - return this; - } - - public Builder withClusterName(final String clusterName) { - this.feedClusterName = clusterName; - return this; - } - - public Builder withFrequency(final String frequency) { - this.feedFrequency = frequency; - return this; - } - - public Builder withStartTime(final String startTime) { - this.feedStartTime = startTime; - return this; - } - - public Builder withEndTime(final String endTime) { - this.feedEndTime = endTime; - return this; - } - - public Builder withLocationPath(final String locationPath) { - this.feedLocationPath = locationPath; - return this; - } - - public Builder withAclOwner(final String aclOwner) { - this.feedAclOwner = aclOwner; - return this; - } - } -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java ---------------------------------------------------------------------- diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java deleted file mode 100644 index d05f300..0000000 --- a/addons/adf/src/main/java/org/apache/falcon/adfservice/Feed.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.adfservice; - -import org.apache.falcon.FalconException; - -/** - * Abstract class for feed. - */ -public abstract class Feed { - protected String feedName; - protected String clusterName; - protected String frequency; - protected String startTime; - protected String endTime; - protected String aclOwner; - - public String getName() { - return feedName; - } - - public abstract String getEntityxml() throws FalconException; -} http://git-wip-us.apache.org/repos/asf/falcon/blob/6f5b476c/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java ---------------------------------------------------------------------- diff --git a/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java b/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java deleted file mode 100644 index 3a65753..0000000 --- a/addons/adf/src/main/java/org/apache/falcon/adfservice/Process.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.falcon.adfservice; - -import org.apache.falcon.adfservice.util.FSUtils; -import org.apache.falcon.FalconException; - -import java.net.URISyntaxException; - -/** - * Class for process. - */ -public class Process { - private static final String PROCESS_TEMPLATE_FILE = "process.xml"; - - private String entityName; - private String frequency; - private String startTime; - private String endTime; - private String clusterName; - private String inputFeedName; - private String outputFeedName; - private String engineType; - private String wfPath; - private String aclOwner; - - public Process(final Builder builder) { - this.entityName = builder.name; - this.clusterName = builder.processClusterName; - this.frequency = builder.processFrequency; - this.startTime = builder.processStartTime; - this.endTime = builder.processEndTime; - this.inputFeedName = builder.processInputFeedName; - this.outputFeedName = builder.processOutputFeedName; - this.engineType = builder.processEngineType; - this.wfPath = builder.processWfPath; - this.aclOwner = builder.processAclOwner; - } - - public String getName() { - return entityName; - } - - public String getEntityxml() throws FalconException { - try { - String template = FSUtils.readHDFSFile(ADFJob.TEMPLATE_PATH_PREFIX, PROCESS_TEMPLATE_FILE); - return template.replace("$processName$", entityName) - .replace("$frequency$", frequency) - .replace("$startTime$", startTime) - .replace("$endTime$", endTime) - .replace("$clusterName$", clusterName) - .replace("$inputFeedName$", inputFeedName) - .replace("$outputFeedName$", outputFeedName) - .replace("$engine$", engineType) - .replace("$scriptPath$", wfPath) - .replace("$aclowner$", aclOwner); - } catch (URISyntaxException e) { - throw new FalconException("Error when generating process xml", e); - } - } - - /** - * Builder for process. - */ - public static class Builder { - private String name; - private String processClusterName; - private String processFrequency; - private String processStartTime; - private String processEndTime; - private String processInputFeedName; - private String processOutputFeedName; - private String processEngineType; - private String processWfPath; - private String processAclOwner; - - public Process build() { - return new Process(this); - } - - public Builder withProcessName(final String processName) { - this.name = processName; - return this; - } - - public Builder withClusterName(final String clusterName) { - this.processClusterName = clusterName; - return this; - } - - public Builder withFrequency(final String frequency) { - this.processFrequency = frequency; - return this; - } - - public Builder withStartTime(final String startTime) { - this.processStartTime = startTime; - return this; - } - - public Builder withEndTime(final String endTime) { - this.processEndTime = endTime; - return this; - } - - public Builder withInputFeedName(final String inputFeedName) { - this.processInputFeedName = inputFeedName; - return this; - } - - public Builder withOutputFeedName(final String outputFeedName) { - this.processOutputFeedName = outputFeedName; - return this; - } - - public Builder withAclOwner(final String aclOwner) { - this.processAclOwner = aclOwner; - return this; - } - - public Builder withEngineType(final String engineType) { - this.processEngineType = engineType; - return this; - } - - public Builder withWFPath(final String wfPath) { - this.processWfPath = wfPath; - return this; - } - } - -}
