Repository: falcon
Updated Branches:
  refs/heads/master c52961c6a -> 95bf312f4


http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
new file mode 100644
index 0000000..49b3a12
--- /dev/null
+++ 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions.mirroring.hive;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.catalog.AbstractCatalogService;
+import org.apache.falcon.catalog.CatalogServiceFactory;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.extensions.AbstractExtension;
+import org.apache.falcon.extensions.ExtensionProperties;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.hadoop.conf.Configuration;
+
+import java.util.Properties;
+
+/**
+ * Hive mirroring extension.
+ */
+public class HiveMirroringExtension extends AbstractExtension {
+    private static final String EXTENSION_NAME = "HIVE-MIRRORING";
+    private static final String ALL_TABLES = "*";
+    private static final String COMMA_DELIMITER = ",";
+    private static final String SECURE_RESOURCE = "-secure";
+
+    @Override
+    public String getName() {
+        return EXTENSION_NAME;
+    }
+
+    @Override
+    public void validate(final Properties extensionProperties) throws 
FalconException {
+        for (HiveMirroringExtensionProperties property : 
HiveMirroringExtensionProperties.values()) {
+            if (extensionProperties.getProperty(property.getName()) == null && 
property.isRequired()) {
+                throw new FalconException("Missing extension property: " + 
property.getName());
+            }
+        }
+
+        Cluster srcCluster = 
ClusterHelper.getCluster(HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName());
+        if (srcCluster == null) {
+            throw new FalconException("Cluster entity " + 
HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName()
+                    + " not found");
+        }
+        String srcClusterCatalogUrl = 
ClusterHelper.getRegistryEndPoint(srcCluster);
+        Configuration srcClusterConf = 
ClusterHelper.getConfiguration(srcCluster);
+
+        // Validate if DB exists - source and target
+        String sourceDbList = extensionProperties.getProperty(
+                HiveMirroringExtensionProperties.SOURCE_DATABASES.getName());
+
+        if (StringUtils.isBlank(sourceDbList)) {
+            throw new FalconException("No source DB specified for Hive 
mirroring");
+        }
+
+        AbstractCatalogService catalogService = 
CatalogServiceFactory.getCatalogService();
+        String[] srcDbs = sourceDbList.split(COMMA_DELIMITER);
+        if (srcDbs.length <= 0) {
+            throw new FalconException("No source DB specified for Hive 
mirroring");
+        }
+        for (String db : srcDbs) {
+            if (!catalogService.dbExists(srcClusterConf, srcClusterCatalogUrl, 
db)) {
+                throw new FalconException("Database " + db + " doesn't exist 
on cluster" + srcCluster.getName());
+            }
+        }
+
+        String sourceTableList = extensionProperties.getProperty(
+                HiveMirroringExtensionProperties.SOURCE_TABLES.getName());
+        if (StringUtils.isNotBlank(sourceTableList)) {
+            if (!sourceTableList.equals(ALL_TABLES)) {
+                String db = srcDbs[0];
+                String[] srcTables = sourceTableList.split(COMMA_DELIMITER);
+                for (String table : srcTables) {
+                    if (!catalogService.tableExists(srcClusterConf, 
srcClusterCatalogUrl, db, table)) {
+                        throw new FalconException("Table " + table + " doesn't 
exist on cluster"
+                                + srcCluster.getName());
+                    }
+                }
+            }
+        }
+
+        // Verify db exists on target
+        Cluster targetCluster = 
ClusterHelper.getCluster(HiveMirroringExtensionProperties.TARGET_CLUSTER.getName());
+        if (targetCluster == null) {
+            throw new FalconException("Cluster entity " + 
HiveMirroringExtensionProperties.TARGET_CLUSTER.getName()
+                    + " not found");
+        }
+        String targetClusterCatalogUrl = 
ClusterHelper.getRegistryEndPoint(targetCluster);
+        Configuration targetClusterConf = 
ClusterHelper.getConfiguration(targetCluster);
+
+        for (String db : srcDbs) {
+            if (!catalogService.dbExists(targetClusterConf, 
targetClusterCatalogUrl, db)) {
+                throw new FalconException("Database " + db + " doesn't exist 
on cluster" + targetCluster.getName());
+            }
+        }
+    }
+
+    @Override
+    public Properties getAdditionalProperties(final Properties 
extensionProperties) throws FalconException {
+        Properties additionalProperties = new Properties();
+
+        String jobName = 
extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName());
+        // Add job name as Hive DR job
+        
additionalProperties.put(HiveMirroringExtensionProperties.HIVE_MIRRORING_JOB_NAME.getName(),
+                jobName + System.currentTimeMillis());
+
+        // Add required properties of cluster where job should run
+        
additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN.getName(),
+                
extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName()));
+        Cluster jobCluster = 
ClusterHelper.getCluster(ExtensionProperties.CLUSTER_NAME.getName());
+        if (jobCluster == null) {
+            throw new FalconException("Cluster entity " + 
ExtensionProperties.CLUSTER_NAME.getName()
+                    + " not found");
+        }
+        
additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN_WRITE_EP.getName(),
+                ClusterHelper.getStorageUrl(jobCluster));
+        if (SecurityUtil.isSecurityEnabled()) {
+            // Add -secure and update the resource name
+            String resourceName = getName().toLowerCase() + SECURE_RESOURCE;
+            
additionalProperties.put(ExtensionProperties.RESOURCE_NAME.getName(), 
resourceName);
+            
additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL.getName(),
+                    ClusterHelper.getPropertyValue(jobCluster, 
SecurityUtil.NN_PRINCIPAL));
+        }
+
+        // Properties for src cluster
+        Cluster srcCluster = 
ClusterHelper.getCluster(HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName());
+        if (srcCluster == null) {
+            throw new FalconException("Cluster entity " + 
HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName()
+                    + " not found");
+        }
+        
additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_METASTORE_URI.getName(),
+                ClusterHelper.getRegistryEndPoint(srcCluster));
+        
additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_NN.getName(),
+                ClusterHelper.getStorageUrl(srcCluster));
+
+        String sourceTableList = extensionProperties.getProperty(
+                HiveMirroringExtensionProperties.SOURCE_TABLES.getName());
+        if (StringUtils.isBlank(sourceTableList)) {
+            
additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_TABLES.getName(),
 ALL_TABLES);
+        }
+
+        if (SecurityUtil.isSecurityEnabled()) {
+            String hive2Principal = 
extensionProperties.getProperty(HiveMirroringExtensionProperties
+                    .SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName());
+            if (StringUtils.isBlank(hive2Principal)) {
+                throw new FalconException("Hive server2 kerberos principal for 
cluster " + srcCluster.getName()
+                        + "not passed for extension " + jobName);
+            }
+
+            
additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(),
+                    ClusterHelper.getPropertyValue(srcCluster, 
SecurityUtil.NN_PRINCIPAL));
+            additionalProperties.put(
+                    
HiveMirroringExtensionProperties.SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName(),
+                    ClusterHelper.getPropertyValue(srcCluster, 
SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL));
+        }
+
+        // Properties for target cluster
+        Cluster targetCluster = 
ClusterHelper.getCluster(HiveMirroringExtensionProperties.TARGET_CLUSTER.getName());
+        if (targetCluster == null) {
+            throw new FalconException("Cluster entity " + 
HiveMirroringExtensionProperties.TARGET_CLUSTER.getName()
+                    + " not found");
+        }
+        
additionalProperties.put(HiveMirroringExtensionProperties.TARGET_METASTORE_URI.getName(),
+                ClusterHelper.getRegistryEndPoint(targetCluster));
+        
additionalProperties.put(HiveMirroringExtensionProperties.TARGET_NN.getName(),
+                ClusterHelper.getStorageUrl(targetCluster));
+
+        if (SecurityUtil.isSecurityEnabled()) {
+            String hive2Principal = 
extensionProperties.getProperty(HiveMirroringExtensionProperties
+                    .TARGET_HIVE2_KERBEROS_PRINCIPAL.getName());
+            if (StringUtils.isBlank(hive2Principal)) {
+                throw new FalconException("Hive server2 kerberos principal for 
cluster " + targetCluster.getName()
+                        + "not passed for extension " + jobName);
+            }
+
+            
additionalProperties.put(HiveMirroringExtensionProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(),
+                    ClusterHelper.getPropertyValue(targetCluster, 
SecurityUtil.NN_PRINCIPAL));
+            additionalProperties.put(
+                    
HiveMirroringExtensionProperties.TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName(),
+                    ClusterHelper.getPropertyValue(targetCluster, 
SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL));
+        }
+
+        // Misc properties
+        // Add default properties if not passed
+        String maxEvents = 
extensionProperties.getProperty(HiveMirroringExtensionProperties.MAX_EVENTS.getName());
+        if (StringUtils.isBlank(maxEvents)) {
+            
additionalProperties.put(HiveMirroringExtensionProperties.MAX_EVENTS.getName(), 
"-1");
+        }
+
+        String replicationMaxMaps =
+                
extensionProperties.getProperty(HiveMirroringExtensionProperties.MAX_MAPS.getName());
+        if (StringUtils.isBlank(replicationMaxMaps)) {
+            
additionalProperties.put(HiveMirroringExtensionProperties.MAX_MAPS.getName(), 
"5");
+        }
+
+        String distcpMaxMaps = extensionProperties.getProperty(
+                HiveMirroringExtensionProperties.DISTCP_MAX_MAPS.getName());
+        if (StringUtils.isBlank(distcpMaxMaps)) {
+            
additionalProperties.put(HiveMirroringExtensionProperties.DISTCP_MAX_MAPS.getName(),
 "1");
+        }
+
+        String distcpMapBandwidth = extensionProperties.getProperty(
+                
HiveMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName());
+        if (StringUtils.isBlank(distcpMapBandwidth)) {
+            
additionalProperties.put(HiveMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName(),
 "100");
+        }
+
+        if (StringUtils.isBlank(
+                
extensionProperties.getProperty(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName())))
 {
+            
additionalProperties.put(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName(),
 "false");
+        }
+
+        return additionalProperties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
new file mode 100644
index 0000000..6c4f58d
--- /dev/null
+++ 
b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions.mirroring.hive;
+
+/**
+ * Hive mirroring extension properties.
+ */
+
+public enum HiveMirroringExtensionProperties {
+    SOURCE_CLUSTER("sourceCluster", "Replication source cluster name"),
+    SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri", 
false),
+    SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"),
+    SOURCE_DATABASES("sourceDatabases", "List of databases to replicate"),
+    SOURCE_TABLES("sourceTables", "List of tables to replicate", false),
+    SOURCE_STAGING_PATH("sourceStagingPath", "Location of source staging 
path", false),
+    SOURCE_NN("sourceNN", "Source name node", false),
+    SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", "Source name 
node kerberos principal", false),
+    
SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL("sourceHiveMetastoreKerberosPrincipal",
+            "Source hive metastore kerberos principal", false),
+    SOURCE_HIVE2_KERBEROS_PRINCIPAL("sourceHive2KerberosPrincipal",
+            "Source hiveserver2 kerberos principal", false),
+
+    TARGET_CLUSTER("targetCluster", "Target cluster name"),
+    TARGET_METASTORE_URI("targetMetastoreUri", "Target Hive metastore uri", 
false),
+    TARGET_HS2_URI("targetHiveServer2Uri", "Target HS2 uri"),
+    TARGET_STAGING_PATH("targetStagingPath", "Location of target staging 
path", false),
+    TARGET_NN("targetNN", "Target name node", false),
+    TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name 
node kerberos principal", false),
+    
TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL("targetHiveMetastoreKerberosPrincipal",
+            "Target hive metastore kerberos principal", false),
+    TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal",
+            "Target hiveserver2 kerberos principal", false),
+
+    MAX_EVENTS("maxEvents", "Maximum events to replicate", false),
+    MAX_MAPS("replicationMaxMaps", "Maximum number of maps used during 
replication", false),
+    DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during 
distcp", false),
+    MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each 
mapper during replication"),
+    CLUSTER_FOR_JOB_RUN("clusterForJobRun", "Cluster on which replication job 
runs", false),
+    CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("Job cluster kerberos principal",
+            "Write EP of cluster on which replication job runs", false),
+    CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of 
cluster on which replication job runs", false),
+    TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE 
encryption is enabled", false),
+    HIVE_MIRRORING_JOB_NAME("jobName", "Unique hive replication job name", 
false);
+
+    private final String name;
+    private final String description;
+    private final boolean isRequired;
+
+    HiveMirroringExtensionProperties(String name, String description) {
+        this(name, description, true);
+    }
+
+    HiveMirroringExtensionProperties(String name, String description, boolean 
isRequired) {
+        this.name = name;
+        this.description = description;
+        this.isRequired = isRequired;
+    }
+
+    public String getName() {
+        return this.name;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+
+    public boolean isRequired() {
+        return isRequired;
+    }
+
+    @Override
+    public String toString() {
+        return getName();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
 
b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
new file mode 100644
index 0000000..9e07112
--- /dev/null
+++ 
b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
@@ -0,0 +1,215 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions.store;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.entity.store.StoreAccessException;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.HashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store for Falcon extensions.
+ */
+public final class ExtensionStore {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ExtensionStore.class);
+    private FileSystem fs;
+
+    private Path storePath;
+
+    // Convention over configuration design paradigm
+    private static final String RESOURCES_DIR= "resources";
+    private static final String LIBS_DIR= "libs";
+
+    private static final String EXTENSION_STORE_URI = "extension.store.uri";
+
+    private static final ExtensionStore STORE = new ExtensionStore();
+
+    public static ExtensionStore get() {
+        return STORE;
+    }
+
+    private ExtensionStore() {
+        String uri = StartupProperties.get().getProperty(EXTENSION_STORE_URI);
+        if (StringUtils.isEmpty(uri)) {
+            throw new RuntimeException("Property extension.store.uri not set 
in startup properties."
+                    + "Please set it to path of extension deployment on HDFS. 
Extension store init failed");
+        }
+        storePath = new Path(uri);
+        fs = initializeFileSystem();
+    }
+
+    private FileSystem initializeFileSystem() {
+        try {
+            FileSystem fileSystem =
+                    
HadoopClientFactory.get().createFalconFileSystem(storePath.toUri());
+            if (!fileSystem.exists(storePath)) {
+                LOG.info("Creating extension store directory: {}", storePath);
+                // set permissions so config store dir is owned by falcon alone
+                HadoopClientFactory.mkdirs(fileSystem, storePath, 
HadoopClientFactory.ALL_PERMISSION);
+            }
+
+            return fileSystem;
+        } catch (Exception e) {
+            throw new RuntimeException("Unable to bring up extension store for 
path: " + storePath, e);
+        }
+    }
+
+    public Map<String, String> getExtensionArtifacts(final String 
extensionName) throws StoreAccessException {
+        Map<String, String> extensionFileMap = new HashMap<>();
+        try {
+            Path extensionPath = new Path(storePath, 
extensionName.toLowerCase());
+            RemoteIterator<LocatedFileStatus> fileStatusListIterator = 
fs.listFiles(extensionPath, true);
+
+            if (!fileStatusListIterator.hasNext()) {
+                throw new StoreAccessException(new Exception(" For extension " 
+ extensionName
+                        + " there are no artifacts at the extension store path 
" + storePath));
+            }
+            while (fileStatusListIterator.hasNext()) {
+                LocatedFileStatus fileStatus = fileStatusListIterator.next();
+                Path filePath = 
Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath());
+                extensionFileMap.put(filePath.getName(), filePath.toString());
+            }
+        } catch (IOException e) {
+            throw new StoreAccessException(e);
+        }
+        return extensionFileMap;
+    }
+
+    public Map<String, String> getExtensionResources(final String 
extensionName) throws StoreAccessException {
+        Map<String, String> extensionFileMap = new HashMap<>();
+        try {
+            Path extensionPath = new Path(storePath, 
extensionName.toLowerCase());
+
+            Path resourcesPath = null;
+            FileStatus[] files = fs.listStatus(extensionPath);
+
+            for (FileStatus fileStatus : files) {
+                if 
(fileStatus.getPath().getName().equalsIgnoreCase(RESOURCES_DIR)) {
+                    resourcesPath = fileStatus.getPath();
+                    break;
+                }
+            }
+
+            if (resourcesPath == null) {
+                throw new StoreAccessException(new Exception(" For extension " 
+ extensionName
+                        + " there is no " + RESOURCES_DIR + "at the extension 
store path " + storePath));
+            }
+            RemoteIterator<LocatedFileStatus> fileStatusListIterator = 
fs.listFiles(resourcesPath, true);
+            while (fileStatusListIterator.hasNext()) {
+                LocatedFileStatus fileStatus = fileStatusListIterator.next();
+                Path filePath = 
Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath());
+                extensionFileMap.put(filePath.getName(), filePath.toString());
+            }
+        } catch (IOException e) {
+            throw new StoreAccessException(e);
+        }
+        return extensionFileMap;
+    }
+
+    public String getExtensionLibPath(final String extensionName) throws 
StoreAccessException {
+        try {
+            Path extensionPath = new Path(storePath, 
extensionName.toLowerCase());
+
+            Path libsPath = null;
+            FileStatus[] files = fs.listStatus(extensionPath);
+
+            for (FileStatus fileStatus : files) {
+                if (fileStatus.getPath().getName().equalsIgnoreCase(LIBS_DIR)) 
{
+                    if (fileStatus.getLen() != 0) {
+                        libsPath = 
Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath());
+                    }
+                    break;
+                }
+            }
+
+            if (libsPath == null) {
+                LOG.info("For extension " + extensionName + " there is no "
+                        + LIBS_DIR + "at the extension store path " + 
storePath);
+                return null;
+            } else {
+                return libsPath.toString();
+            }
+        } catch (IOException e) {
+            throw new StoreAccessException(e);
+        }
+    }
+
+    public String getExtensionResource(final String resourcePath) throws 
StoreAccessException {
+        if (StringUtils.isBlank(resourcePath)) {
+            throw new StoreAccessException(new Exception("Resource path cannot 
be null or empty"));
+        }
+
+        try {
+            Path resourceFile = new Path(resourcePath);
+
+            ByteArrayOutputStream writer = new ByteArrayOutputStream();
+            InputStream data = fs.open(resourceFile);
+            IOUtils.copyBytes(data, writer, fs.getConf(), true);
+            return writer.toString();
+        } catch (IOException e) {
+            throw new StoreAccessException(e);
+        }
+    }
+
+    public List<String> getExtensions() throws StoreAccessException {
+        List<String> extesnionList = new ArrayList<>();
+        try {
+            FileStatus[] fileStatuses = fs.listStatus(storePath);
+
+            for (FileStatus fileStatus : fileStatuses) {
+                if (fileStatus.isDirectory()) {
+                    Path filePath = 
Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath());
+                    extesnionList.add(filePath.getName());
+                }
+            }
+        } catch (IOException e) {
+            throw new StoreAccessException(e);
+        }
+        return extesnionList;
+    }
+
+    public Path getExtensionStorePath() {
+        return storePath;
+    }
+
+    public boolean isExtensionStoreInitialized() {
+        return (storePath != null);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
 
b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
new file mode 100644
index 0000000..92e9805
--- /dev/null
+++ 
b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java
@@ -0,0 +1,309 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions.util;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.Frequency;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.process.ACL;
+import org.apache.falcon.entity.v0.process.Cluster;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.entity.v0.process.Notification;
+import org.apache.falcon.entity.v0.process.PolicyType;
+import org.apache.falcon.entity.v0.process.Property;
+import org.apache.falcon.entity.v0.process.Retry;
+import org.apache.falcon.entity.v0.process.Workflow;
+import org.apache.falcon.extensions.ExtensionProperties;
+import org.apache.falcon.security.SecurityUtil;
+import org.apache.falcon.util.NotificationType;
+
+import javax.xml.bind.Unmarshaller;
+import javax.xml.bind.ValidationEvent;
+import javax.xml.bind.ValidationEventHandler;
+import java.io.StringReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.TimeZone;
+
+
+/**
+ * Extension builder utility.
+ */
+public final class ExtensionProcessBuilderUtils {
+
+    private static final Pattern EXTENSION_VAR_PATTERN = 
Pattern.compile("##[A-Za-z0-9_.]*##");
+
+    private ExtensionProcessBuilderUtils() {
+    }
+
+    public static Entity createProcessFromTemplate(final String 
processTemplate,
+                                                         final String 
extensionName,
+                                                         final Properties 
extensionProperties,
+                                                         final String wfPath,
+                                                         final String 
wfLibPath) throws FalconException {
+        if (StringUtils.isBlank(processTemplate) || 
StringUtils.isBlank(extensionName)
+                || extensionProperties == null || StringUtils.isBlank(wfPath)) 
{
+            throw new FalconException("Invalid arguments passed to extension 
builder");
+        }
+        org.apache.falcon.entity.v0.process.Process process = 
bindAttributesInTemplate(
+                processTemplate, extensionProperties, extensionName, wfPath, 
wfLibPath);
+
+        validateGeneratedProcess(process.toString());
+        return process;
+    }
+
+    private static org.apache.falcon.entity.v0.process.Process
+    bindAttributesInTemplate(final String processTemplate, final Properties 
extensionProperties,
+                             final String extensionName, final String wfPath,
+                             final String wfLibPath)
+        throws FalconException {
+        if (StringUtils.isBlank(processTemplate) || extensionProperties == 
null) {
+            throw new FalconException("Process template or properties cannot 
be null");
+        }
+
+        org.apache.falcon.entity.v0.process.Process process;
+        try {
+            Unmarshaller unmarshaller = EntityType.PROCESS.getUnmarshaller();
+            // Validation can be skipped for unmarshalling as we want to bind 
template with the properties.
+            // Vaildation is handled as part of marshalling
+            unmarshaller.setSchema(null);
+            unmarshaller.setEventHandler(new ValidationEventHandler() {
+                    public boolean handleEvent(ValidationEvent 
validationEvent) {
+                        return true;
+                    }
+                }
+            );
+            process = (org.apache.falcon.entity.v0.process.Process)
+                    unmarshaller.unmarshal(new StringReader(processTemplate));
+        } catch (Exception e) {
+            throw new FalconException(e);
+        }
+
+        /* For optional properties user might directly set them in the process 
xml and might not set it in properties
+           file. Before doing the submission validation is done to confirm 
process xml doesn't have
+           EXTENSION_VAR_PATTERN
+        */
+
+        String processName = 
extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName());
+        if (StringUtils.isNotEmpty(processName)) {
+            process.setName(processName);
+        }
+
+        // DR process template has only one cluster
+        bindClusterProperties(process.getClusters().getClusters().get(0), 
extensionProperties);
+
+        // bind scheduling properties
+        String processFrequency = 
extensionProperties.getProperty(ExtensionProperties.FREQUENCY.getName());
+        if (StringUtils.isNotEmpty(processFrequency)) {
+            process.setFrequency(Frequency.fromString(processFrequency));
+        }
+
+        String zone = 
extensionProperties.getProperty(ExtensionProperties.TIMEZONE.getName());
+        if (StringUtils.isNotBlank(zone)) {
+            process.setTimezone(TimeZone.getTimeZone(zone));
+        } else {
+            process.setTimezone(TimeZone.getTimeZone("UTC"));
+        }
+
+        bindWorkflowProperties(process.getWorkflow(), extensionName, wfPath, 
wfLibPath);
+        bindRetryProperties(process.getRetry(), extensionProperties);
+        bindNotificationProperties(process.getNotification(), 
extensionProperties);
+        bindACLProperties(process.getACL(), extensionProperties);
+        bindTagsProperties(process, extensionProperties);
+        bindCustomProperties(process.getProperties(), extensionProperties);
+
+        return process;
+    }
+
+    private static void bindClusterProperties(final Cluster cluster,
+                                              final Properties 
extensionProperties) {
+        String clusterName = 
extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName());
+        if (StringUtils.isNotEmpty(clusterName)) {
+            cluster.setName(clusterName);
+        }
+        String clusterStartValidity = 
extensionProperties.getProperty(ExtensionProperties.VALIDITY_START.getName());
+        if (StringUtils.isNotEmpty(clusterStartValidity)) {
+            
cluster.getValidity().setStart(SchemaHelper.parseDateUTC(clusterStartValidity));
+        }
+
+        String clusterEndValidity = 
extensionProperties.getProperty(ExtensionProperties.VALIDITY_END.getName());
+        if (StringUtils.isNotEmpty(clusterEndValidity)) {
+            
cluster.getValidity().setEnd(SchemaHelper.parseDateUTC(clusterEndValidity));
+        }
+    }
+
+    private static void bindWorkflowProperties(final Workflow wf,
+                                               final String extensionName,
+                                               final String wfPath,
+                                               final String wfLibPath) {
+        final EngineType defaultEngineType = EngineType.OOZIE;
+        final String workflowNameSuffix = "-workflow";
+
+        wf.setName(extensionName + workflowNameSuffix);
+        wf.setEngine(defaultEngineType);
+        wf.setPath(wfPath);
+        if (StringUtils.isNotEmpty(wfLibPath)) {
+            wf.setLib(wfLibPath);
+        } else {
+            wf.setLib("");
+        }
+    }
+
+    private static void bindRetryProperties(final Retry processRetry,
+                                            final Properties 
extensionProperties) {
+        final PolicyType defaultRetryPolicy = PolicyType.PERIODIC;
+        final int defaultRetryAttempts = 3;
+        final Frequency defaultRetryDelay = new Frequency("minutes(30)");
+
+        String retryPolicy = 
extensionProperties.getProperty(ExtensionProperties.RETRY_POLICY.getName());
+        if (StringUtils.isNotBlank(retryPolicy)) {
+            processRetry.setPolicy(PolicyType.fromValue(retryPolicy));
+        } else {
+            processRetry.setPolicy(defaultRetryPolicy);
+        }
+
+        String retryAttempts = 
extensionProperties.getProperty(ExtensionProperties.RETRY_ATTEMPTS.getName());
+        if (StringUtils.isNotBlank(retryAttempts)) {
+            processRetry.setAttempts(Integer.parseInt(retryAttempts));
+        } else {
+            processRetry.setAttempts(defaultRetryAttempts);
+        }
+
+        String retryDelay = 
extensionProperties.getProperty(ExtensionProperties.RETRY_DELAY.getName());
+        if (StringUtils.isNotBlank(retryDelay)) {
+            processRetry.setDelay(Frequency.fromString(retryDelay));
+        } else {
+            processRetry.setDelay(defaultRetryDelay);
+        }
+
+        String retryOnTimeout = 
extensionProperties.getProperty(ExtensionProperties.RETRY_ON_TIMEOUT.getName());
+        if (StringUtils.isNotBlank(retryOnTimeout)) {
+            processRetry.setOnTimeout(Boolean.valueOf(retryOnTimeout));
+        } else {
+            processRetry.setOnTimeout(false);
+        }
+    }
+
+    private static void bindNotificationProperties(final Notification 
processNotification,
+                                                   final Properties 
extensionProperties) {
+        final String defaultNotificationType = 
NotificationType.EMAIL.getName();
+
+        String notificationType = extensionProperties.getProperty(
+                ExtensionProperties.JOB_NOTIFICATION_TYPE.getName());
+        if (StringUtils.isNotBlank(notificationType)) {
+            processNotification.setType(notificationType);
+        } else {
+            processNotification.setType(defaultNotificationType);
+        }
+
+        String notificationAddress = extensionProperties.getProperty(
+                ExtensionProperties.JOB_NOTIFICATION_ADDRESS.getName());
+        if (StringUtils.isNotBlank(notificationAddress)) {
+            processNotification.setTo(notificationAddress);
+        } else {
+            processNotification.setTo("NA");
+        }
+    }
+
+    private static void bindACLProperties(final ACL acl,
+                                          final Properties 
extensionProperties) throws FalconException {
+        if (!SecurityUtil.isAuthorizationEnabled()) {
+            return;
+        }
+
+        String aclowner = 
extensionProperties.getProperty(ExtensionProperties.JOB_ACL_OWNER.getName());
+        if (StringUtils.isNotEmpty(aclowner)) {
+            acl.setOwner(aclowner);
+        } else {
+            throw new FalconException("ACL owner extension property cannot be 
null or empty when authorization is "
+                    + "enabled");
+        }
+
+        String aclGroup = 
extensionProperties.getProperty(ExtensionProperties.JOB_ACL_GROUP.getName());
+        if (StringUtils.isNotEmpty(aclGroup)) {
+            acl.setGroup(aclGroup);
+        } else {
+            throw new FalconException("ACL group extension property cannot be 
null or empty when authorization is "
+                    + "enabled");
+        }
+
+        String aclPermission = 
extensionProperties.getProperty(ExtensionProperties.JOB_ACL_PERMISSION.getName());
+        if (StringUtils.isNotEmpty(aclPermission)) {
+            acl.setPermission(aclPermission);
+        } else {
+            throw new FalconException("ACL permission extension property 
cannot be null or empty when authorization is "
+                    + "enabled");
+        }
+    }
+
+    private static void bindTagsProperties(final 
org.apache.falcon.entity.v0.process.Process process,
+                                           final Properties 
extensionProperties) {
+        String falconSystemTags = process.getTags();
+        String tags = 
extensionProperties.getProperty(ExtensionProperties.JOB_TAGS.getName());
+        if (StringUtils.isNotEmpty(tags)) {
+            if (StringUtils.isNotEmpty(falconSystemTags)) {
+                tags += ", " + falconSystemTags;
+            }
+            process.setTags(tags);
+        }
+    }
+
+
+    private static void bindCustomProperties(final 
org.apache.falcon.entity.v0.process.Properties customProperties,
+                                             final Properties 
extensionProperties) {
+        List<Property> propertyList = new ArrayList<>();
+
+        for (Map.Entry<Object, Object> extensionProperty : 
extensionProperties.entrySet()) {
+            if 
(ExtensionProperties.getOptionsMap().get(extensionProperty.getKey().toString()) 
== null) {
+                addProperty(propertyList, (String) extensionProperty.getKey(), 
(String) extensionProperty.getValue());
+            }
+        }
+
+        customProperties.getProperties().addAll(propertyList);
+    }
+
+    private static void addProperty(List<Property> propertyList, String name, 
String value) {
+        Property prop = new Property();
+        prop.setName(name);
+        prop.setValue(value);
+        propertyList.add(prop);
+    }
+
+    private static void validateGeneratedProcess(final String 
generatedProcess) throws FalconException {
+        if (StringUtils.isBlank(generatedProcess)) {
+            throw new IllegalArgumentException("Invalid arguments passed");
+        }
+
+        Matcher matcher = EXTENSION_VAR_PATTERN.matcher(generatedProcess);
+        if (matcher.find()) {
+            String variable = generatedProcess.substring(matcher.start(), 
matcher.end());
+            throw new FalconException("Match not found for the template: " + 
variable
+                    + " in extension template file. Please add it in extension 
properties file");
+        }
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java
 
b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java
new file mode 100644
index 0000000..c8df2c0
--- /dev/null
+++ 
b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions;
+
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import org.testng.Assert;
+
+/**
+ * Unit tests for ExtensionService.
+ */
+public class ExtensionServiceTest {
+
+    private ExtensionService service;
+
+    @BeforeClass
+    public void setUp() throws Exception {
+        service = new ExtensionService();
+        service.init();
+    }
+
+    @AfterClass
+    public void tearDown() throws Exception {
+        service.destroy();
+    }
+
+    @Test
+    public void testGetName() throws Exception {
+        Assert.assertEquals(service.getName(), ExtensionService.SERVICE_NAME);
+    }
+
+    @Test
+    public void testGetextensionStore() throws Exception {
+        Assert.assertNotNull(ExtensionService.getExtensionStore());
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java 
b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
new file mode 100644
index 0000000..ffd9336
--- /dev/null
+++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions;
+
+import junit.framework.Assert;
+import org.apache.falcon.FalconException;
+import org.apache.falcon.entity.ClusterHelper;
+import org.apache.falcon.entity.EntityUtil;
+import org.apache.falcon.entity.parser.EntityParserFactory;
+import org.apache.falcon.entity.parser.ProcessEntityParser;
+import org.apache.falcon.entity.store.ConfigurationStore;
+import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.EntityType;
+import org.apache.falcon.entity.v0.SchemaHelper;
+import org.apache.falcon.entity.v0.cluster.Cluster;
+import org.apache.falcon.entity.v0.process.EngineType;
+import org.apache.falcon.entity.v0.process.PolicyType;
+import org.apache.falcon.entity.v0.process.Process;
+import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension;
+import 
org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtensionProperties;
+import org.apache.falcon.extensions.store.AbstractTestExtensionStore;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * Tests for Extension.
+ */
+public class ExtensionTest extends AbstractTestExtensionStore {
+    private static final String PRIMARY_CLUSTER_XML = 
"/primary-cluster-0.1.xml";
+    private static final String BACKUP_CLUSTER_XML = "/backup-cluster-0.1.xml";
+    private static final String JOB_NAME = "hdfs-mirroring-monthly";
+    private static final String JOB_CLUSTER_NAME = "primaryCluster";
+    private static final String VALIDITY_START = "2016-01-02T00:00Z";
+    private static final String VALIDITY_END = "2018-01-02T00:00Z";
+    private static final String FREQUENCY = "days(1)";
+    private static final String SOURCEDIR = "/users/source/file1";
+    private static final String SOURCE_CLUSTER = "primaryCluster";
+    private static final String TARGETDIR = "/users/target/file1";
+    private static final String TARGET_CLUSTER = "backupCluster";
+    private Extension extension;
+
+    private static Properties getHdfsProperties() {
+        Properties properties = new Properties();
+        properties.setProperty(ExtensionProperties.JOB_NAME.getName(),
+                JOB_NAME);
+        properties.setProperty(ExtensionProperties.CLUSTER_NAME.getName(),
+                JOB_CLUSTER_NAME);
+        properties.setProperty(ExtensionProperties.VALIDITY_START.getName(),
+                VALIDITY_START);
+        properties.setProperty(ExtensionProperties.VALIDITY_END.getName(),
+                VALIDITY_END);
+        properties.setProperty(ExtensionProperties.FREQUENCY.getName(),
+                FREQUENCY);
+        
properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(),
+                SOURCEDIR);
+        
properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName(),
+                SOURCE_CLUSTER);
+        
properties.setProperty(HdfsMirroringExtensionProperties.TARGET_DIR.getName(),
+                TARGETDIR);
+        
properties.setProperty(HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName(),
+                TARGET_CLUSTER);
+
+        return properties;
+    }
+
+    @BeforeClass
+    public void init() throws Exception {
+        extension = new Extension();
+        initClusters();
+    }
+
+    private void initClusters() throws Exception {
+        InputStream inputStream = 
getClass().getResourceAsStream(PRIMARY_CLUSTER_XML);
+        Cluster primaryCluster = (Cluster) 
EntityType.CLUSTER.getUnmarshaller().unmarshal(inputStream);
+        ConfigurationStore.get().publish(EntityType.CLUSTER, primaryCluster);
+
+        inputStream = getClass().getResourceAsStream(BACKUP_CLUSTER_XML);
+        Cluster backupCluster = (Cluster) 
EntityType.CLUSTER.getUnmarshaller().unmarshal(inputStream);
+        ConfigurationStore.get().publish(EntityType.CLUSTER, backupCluster);
+    }
+
+    @Test
+    public void testGetExtensionEntitiesForHdfsMirroring() throws 
FalconException {
+        ProcessEntityParser parser = (ProcessEntityParser) 
EntityParserFactory.getParser(EntityType.PROCESS);
+
+        List<Entity> entities = extension.getEntities(new 
HdfsMirroringExtension().getName(), getHdfsProperties());
+        if (entities == null || entities.isEmpty()) {
+            Assert.fail("Entities returned cannot be null or empty");
+        }
+
+        Assert.assertEquals(1, entities.size());
+        Entity entity = entities.get(0);
+
+
+        Assert.assertEquals(EntityType.PROCESS, entity.getEntityType());
+        parser.parse(new ByteArrayInputStream(entity.toString().getBytes()));
+
+        // Validate
+        Process processEntity = (Process) entity;
+        Assert.assertEquals(JOB_NAME, processEntity.getName());
+        org.apache.falcon.entity.v0.process.Cluster jobCluster = 
processEntity.getClusters().
+                getClusters().get(0);
+        Assert.assertEquals(JOB_CLUSTER_NAME, jobCluster.getName());
+        Assert.assertEquals(VALIDITY_START, 
SchemaHelper.formatDateUTC(jobCluster.getValidity().getStart()));
+        Assert.assertEquals(VALIDITY_END, 
SchemaHelper.formatDateUTC(jobCluster.getValidity().getEnd()));
+
+        Assert.assertEquals(FREQUENCY, 
processEntity.getFrequency().toString());
+        Assert.assertEquals("UTC", processEntity.getTimezone().getID());
+
+        Assert.assertEquals(EngineType.OOZIE, 
processEntity.getWorkflow().getEngine());
+        Assert.assertEquals(extensionStorePath + "/hdfs-mirroring/libs",
+                processEntity.getWorkflow().getLib());
+        Assert.assertEquals(extensionStorePath
+                + 
"/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml",
+                processEntity.getWorkflow().getPath());
+
+        Properties props = EntityUtil.getEntityProperties(processEntity);
+
+        String srcClusterEndPoint = 
ClusterHelper.getReadOnlyStorageUrl(ClusterHelper.getCluster(SOURCE_CLUSTER));
+        Assert.assertEquals(srcClusterEndPoint + SOURCEDIR, 
props.getProperty("sourceDir"));
+        Assert.assertEquals(SOURCE_CLUSTER, 
props.getProperty("sourceCluster"));
+        Assert.assertEquals(TARGETDIR, props.getProperty("targetDir"));
+        Assert.assertEquals(TARGET_CLUSTER, 
props.getProperty("targetCluster"));
+
+        //retry
+        Assert.assertEquals(3, processEntity.getRetry().getAttempts());
+        Assert.assertEquals(PolicyType.PERIODIC, 
processEntity.getRetry().getPolicy());
+        Assert.assertEquals("minutes(30)", 
processEntity.getRetry().getDelay().toString());
+    }
+
+    @Test(expectedExceptions = FalconException.class,
+            expectedExceptionsMessageRegExp = "Missing extension property: 
jobClusterName")
+    public void 
testGetExtensionEntitiesForHdfsMirroringMissingMandatoryProperties() throws 
FalconException {
+        Properties props = getHdfsProperties();
+        props.remove(ExtensionProperties.CLUSTER_NAME.getName());
+
+        extension.getEntities(new HdfsMirroringExtension().getName(), props);
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java
 
b/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java
new file mode 100644
index 0000000..b62b475
--- /dev/null
+++ 
b/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions.store;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.falcon.extensions.AbstractExtension;
+import org.apache.falcon.extensions.ExtensionService;
+import org.apache.falcon.hadoop.HadoopClientFactory;
+import org.apache.falcon.util.StartupProperties;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+
+/**
+ *  Abstract class to setup extension store.
+*/
+public class AbstractTestExtensionStore {
+
+    protected String extensionStorePath;
+    protected ExtensionStore store;
+    private FileSystem fileSystem;
+
+    @BeforeClass
+    public void initConfigStore() throws Exception {
+        String configPath = new 
URI(StartupProperties.get().getProperty("extension.store.uri")).getPath();
+        extensionStorePath = configPath + "-" + getClass().getName();
+        StartupProperties.get().setProperty("extension.store.uri", 
extensionStorePath);
+        new ExtensionService().init();
+        store = ExtensionService.getExtensionStore();
+        fileSystem = HadoopClientFactory.get().createFalconFileSystem(new 
Configuration(true));
+
+        extensionStoreSetup();
+    }
+
+    private void extensionStoreSetup() throws IOException {
+        List<AbstractExtension> extensions = AbstractExtension.getExtensions();
+        for (AbstractExtension extension : extensions) {
+            String extensionName = extension.getName().toLowerCase();
+            Path extensionPath = new Path(extensionStorePath, extensionName);
+            Path libPath = new Path(extensionPath, "libs");
+            Path resourcesPath = new Path(extensionPath, "resources");
+            HadoopClientFactory.mkdirs(fileSystem, extensionPath,
+                    HadoopClientFactory.READ_EXECUTE_PERMISSION);
+            HadoopClientFactory.mkdirs(fileSystem, new Path(extensionPath, new 
Path("README")),
+                    HadoopClientFactory.READ_EXECUTE_PERMISSION);
+
+            HadoopClientFactory.mkdirs(fileSystem, libPath,
+                    HadoopClientFactory.READ_EXECUTE_PERMISSION);
+            HadoopClientFactory.mkdirs(fileSystem, new Path(libPath, "build"),
+                    HadoopClientFactory.READ_EXECUTE_PERMISSION);
+            HadoopClientFactory.mkdirs(fileSystem, new Path(libPath, 
"runtime"),
+                    HadoopClientFactory.READ_EXECUTE_PERMISSION);
+
+            HadoopClientFactory.mkdirs(fileSystem, resourcesPath,
+                    HadoopClientFactory.READ_EXECUTE_PERMISSION);
+            HadoopClientFactory.mkdirs(fileSystem, new Path(resourcesPath, 
"build"),
+                    HadoopClientFactory.READ_EXECUTE_PERMISSION);
+            Path runTimeResourcePath = new Path(resourcesPath, "runtime");
+            HadoopClientFactory.mkdirs(fileSystem, runTimeResourcePath,
+                    HadoopClientFactory.READ_EXECUTE_PERMISSION);
+
+            fileSystem.create(new Path(runTimeResourcePath, extensionName + 
"-workflow.xml"));
+            Path dstFile = new Path(runTimeResourcePath, extensionName + 
"-template.xml");
+            fileSystem.create(dstFile);
+            String srcFile = extensionName + "-template.xml";
+            fileSystem.copyFromLocalFile(new Path(getAbsolutePath(srcFile)), 
dstFile);
+        }
+
+    }
+
+    private String getAbsolutePath(String fileName) {
+        return this.getClass().getResource("/" + fileName).getPath();
+    }
+
+
+    @AfterClass
+    public void cleanUp() throws Exception {
+        FileUtils.deleteDirectory(new File(extensionStorePath));
+    }
+}

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
 
b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
new file mode 100644
index 0000000..3462321
--- /dev/null
+++ 
b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.falcon.extensions.store;
+
+import com.google.common.collect.ImmutableMap;
+import org.apache.falcon.entity.store.StoreAccessException;
+import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import java.util.Map;
+
+/**
+ *  Tests for extension store.
+ */
+public class ExtensionStoreTest extends AbstractTestExtensionStore {
+    private static Map<String, String> resourcesMap;
+
+    @BeforeClass
+    public void init() throws Exception {
+        resourcesMap = ImmutableMap.of(
+                "hdfs-mirroring-template.xml", extensionStorePath
+                        + 
"/hdfs-mirroring/resources/runtime/hdfs-mirroring-template.xml",
+                "hdfs-mirroring-workflow.xml", extensionStorePath
+                        + 
"/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml"
+        );
+    }
+
+    @Test
+    public void testGetExtensionResources() throws StoreAccessException {
+        String extensionName = new HdfsMirroringExtension().getName();
+        Map<String, String> resources = 
store.getExtensionResources(extensionName);
+
+        for (Map.Entry<String, String> entry : resources.entrySet()) {
+            String path = resourcesMap.get(entry.getKey());
+            Assert.assertEquals(entry.getValue(), path);
+        }
+    }
+
+    @Test
+    public void testGetExtensionLibPath() throws StoreAccessException {
+        String extensionName = new HdfsMirroringExtension().getName();
+        String libPath = extensionStorePath + "/hdfs-mirroring/libs";
+        Assert.assertEquals(store.getExtensionLibPath(extensionName), libPath);
+    }
+
+}
+

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/resources/backup-cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/backup-cluster-0.1.xml 
b/extensions/src/test/resources/backup-cluster-0.1.xml
new file mode 100644
index 0000000..c3ba6b9
--- /dev/null
+++ b/extensions/src/test/resources/backup-cluster-0.1.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<cluster colo="gs" description="" name="backupCluster" 
xmlns="uri:falcon:cluster:0.1"
+        >
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://localhost:50010";
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/";
+                   version="4.0"/>
+        <interface type="messaging" 
endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="Hcat" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+        <property name="hive.metastore.client.socket.timeout" value="20"/>
+    </properties>
+</cluster>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/resources/hdfs-mirroring-template.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/hdfs-mirroring-template.xml 
b/extensions/src/test/resources/hdfs-mirroring-template.xml
new file mode 100644
index 0000000..6c35c5b
--- /dev/null
+++ b/extensions/src/test/resources/hdfs-mirroring-template.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<process name="##job.name##" xmlns="uri:falcon:process:0.1">
+    <clusters>
+        <!--  source  -->
+        <cluster name="##job.cluster.name##">
+            <validity end="##job.validity.end##" 
start="##job.validity.start##"/>
+        </cluster>
+    </clusters>
+
+    <tags/>
+
+    <parallel>1</parallel>
+    <!-- Dir replication needs to run only once to catch up -->
+    <order>LAST_ONLY</order>
+    <frequency>##job.frequency##</frequency>
+    <timezone>##job.timezone##</timezone>
+
+    <properties>
+        <property name="oozie.wf.subworkflow.classpath.inheritance" 
value="true"/>
+    </properties>
+
+    <workflow name="##job.workflow.name##" engine="##job.workflow.engine##"
+              path="##job.workflow.path##" lib="##job.workflow.lib.path##"/>
+    <retry policy="##job.retry.policy##" delay="##job.retry.delay##" 
attempts="3"/>
+    <notification type="##job.notification.type##" 
to="##job.notification.receivers##"/>
+    <ACL/>
+</process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/resources/hive-mirroring-template.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/hive-mirroring-template.xml 
b/extensions/src/test/resources/hive-mirroring-template.xml
new file mode 100644
index 0000000..9f28991
--- /dev/null
+++ b/extensions/src/test/resources/hive-mirroring-template.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+
+<process name="##name##" xmlns="uri:falcon:process:0.1">
+    <clusters>
+        <!--  source  -->
+        <cluster name="##cluster.name##">
+            <validity end="##cluster.validity.end##" 
start="##cluster.validity.start##"/>
+        </cluster>
+    </clusters>
+
+    <tags/>
+
+    <parallel>1</parallel>
+    <!-- Replication needs to run only once to catch up -->
+    <order>LAST_ONLY</order>
+    <frequency>##process.frequency##</frequency>
+    <timezone>UTC</timezone>
+
+    <properties>
+        <property name="oozie.wf.subworkflow.classpath.inheritance" 
value="true"/>
+    </properties>
+
+    <workflow name="##workflow.name##" engine="oozie"
+              
path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" 
lib="##workflow.lib.path##"/>
+    <retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/>
+    <notification type="##notification.type##" 
to="##notification.receivers##"/>
+    <ACL/>
+</process>

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/resources/primary-cluster-0.1.xml
----------------------------------------------------------------------
diff --git a/extensions/src/test/resources/primary-cluster-0.1.xml 
b/extensions/src/test/resources/primary-cluster-0.1.xml
new file mode 100644
index 0000000..a9694c2
--- /dev/null
+++ b/extensions/src/test/resources/primary-cluster-0.1.xml
@@ -0,0 +1,44 @@
+<?xml version="1.0"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<cluster colo="gs" description="" name="primaryCluster" 
xmlns="uri:falcon:cluster:0.1"
+        >
+    <interfaces>
+        <interface type="readonly" endpoint="hftp://localhost:50010";
+                   version="0.20.2"/>
+        <interface type="write" endpoint="hdfs://localhost:8020"
+                   version="0.20.2"/>
+        <interface type="execute" endpoint="localhost:8021" version="0.20.2"/>
+        <interface type="workflow" endpoint="http://localhost:11000/oozie/";
+                   version="4.0"/>
+        <interface type="messaging" 
endpoint="tcp://localhost:61616?daemon=true"
+                   version="5.1.6"/>
+        <interface type="registry" endpoint="Hcat" version="1"/>
+    </interfaces>
+    <locations>
+        <location name="staging" path="/projects/falcon/staging"/>
+        <location name="temp" path="/tmp"/>
+        <location name="working" path="/projects/falcon/working"/>
+    </locations>
+    <properties>
+        <property name="field1" value="value1"/>
+        <property name="field2" value="value2"/>
+        <property name="hive.metastore.client.socket.timeout" value="20"/>
+    </properties>
+</cluster>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/oozie/pom.xml
----------------------------------------------------------------------
diff --git a/oozie/pom.xml b/oozie/pom.xml
index c53d33c..04b3df6 100644
--- a/oozie/pom.xml
+++ b/oozie/pom.xml
@@ -67,6 +67,12 @@
 
         <dependency>
             <groupId>org.apache.falcon</groupId>
+            <artifactId>falcon-extensions</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.falcon</groupId>
             <artifactId>falcon-common</artifactId>
         </dependency>
 

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
----------------------------------------------------------------------
diff --git 
a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
 
b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
index c15d6b9..815f5f7 100644
--- 
a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
+++ 
b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java
@@ -29,9 +29,14 @@ import 
org.apache.falcon.entity.v0.cluster.ClusterLocationType;
 import org.apache.falcon.entity.v0.cluster.Interfacetype;
 import org.apache.falcon.hadoop.HadoopClientFactory;
 import org.apache.falcon.util.StartupProperties;
+import org.apache.falcon.extensions.store.ExtensionStore;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -75,16 +80,70 @@ public class SharedLibraryHostingService implements 
ConfigurationChangeListener
         }
     };
 
-    private void addLibsTo(Cluster cluster) throws FalconException {
-        FileSystem fs = null;
+    private void pushExtensionArtifactsToCluster(final Cluster cluster,
+                                                 final FileSystem clusterFs) 
throws FalconException {
+
+        ExtensionStore store = ExtensionStore.get();
+        if (!store.isExtensionStoreInitialized()) {
+            LOG.info("Extension store not initialized by Extension service. 
Make sure Extension service is added in "
+                    + "start up properties");
+            return;
+        }
+
+        Path extensionStorePath = store.getExtensionStorePath();
+        LOG.info("extensionStorePath :{}", extensionStorePath);
+        FileSystem falconFileSystem =
+                
HadoopClientFactory.get().createFalconFileSystem(extensionStorePath.toUri());
+        String nameNode = 
StringUtils.removeEnd(falconFileSystem.getConf().get(HadoopClientFactory
+                .FS_DEFAULT_NAME_KEY), File.separator);
+
+
+        String clusterStorageUrl = 
StringUtils.removeEnd(ClusterHelper.getStorageUrl(cluster), File.separator);
+
+        // If default fs for Falcon server is same as cluster fs abort copy
+        if (nameNode.equalsIgnoreCase(clusterStorageUrl)) {
+            LOG.info("clusterStorageUrl :{} same return", clusterStorageUrl);
+            return;
+        }
+
         try {
-            LOG.info("Initializing FS: {} for cluster: {}", 
ClusterHelper.getStorageUrl(cluster), cluster.getName());
-            fs = 
HadoopClientFactory.get().createFalconFileSystem(ClusterHelper.getConfiguration(cluster));
-            fs.getStatus();
-        } catch (Exception e) {
-            throw new FalconException("Failed to initialize FS for cluster : " 
+ cluster.getName(), e);
+            RemoteIterator<LocatedFileStatus> fileStatusListIterator =
+                    falconFileSystem.listFiles(extensionStorePath, true);
+
+            while (fileStatusListIterator.hasNext()) {
+                LocatedFileStatus srcfileStatus = 
fileStatusListIterator.next();
+                Path filePath = 
Path.getPathWithoutSchemeAndAuthority(srcfileStatus.getPath());
+
+                if (srcfileStatus.isDirectory()) {
+                    if (!clusterFs.exists(filePath)) {
+                        HadoopClientFactory.mkdirs(clusterFs, filePath, 
srcfileStatus.getPermission());
+                    }
+                } else {
+                    if (clusterFs.exists(filePath)) {
+                        FileStatus targetfstat = 
clusterFs.getFileStatus(filePath);
+                        if (targetfstat.getLen() == srcfileStatus.getLen()) {
+                            continue;
+                        }
+                    }
+
+                    Path parentPath = filePath.getParent();
+                    if (!clusterFs.exists(parentPath)) {
+                        FsPermission dirPerm = 
falconFileSystem.getFileStatus(parentPath).getPermission();
+                        HadoopClientFactory.mkdirs(clusterFs, parentPath, 
dirPerm);
+                    }
+
+                    FileUtil.copy(falconFileSystem, srcfileStatus, clusterFs, 
filePath, false, true,
+                            falconFileSystem.getConf());
+                    
FileUtil.chmod(clusterFs.makeQualified(filePath).toString(),
+                            srcfileStatus.getPermission().toString());
+                }
+            }
+        } catch (IOException | InterruptedException e) {
+            throw new FalconException("Failed to copy extension artifacts to 
cluster" + cluster.getName(), e);
         }
+    }
 
+    private void addLibsTo(Cluster cluster, FileSystem fs) throws 
FalconException {
         try {
             Path lib = new Path(ClusterHelper.getLocation(cluster, 
ClusterLocationType.WORKING).getPath(),
                     "lib");
@@ -173,7 +232,8 @@ public class SharedLibraryHostingService implements 
ConfigurationChangeListener
             return;
         }
 
-        addLibsTo(cluster);
+        addLibsTo(cluster, getFilesystem(cluster));
+        pushExtensionArtifactsToCluster(cluster, getFilesystem(cluster));
     }
 
     @Override
@@ -192,7 +252,8 @@ public class SharedLibraryHostingService implements 
ConfigurationChangeListener
                 .equals(ClusterHelper.getInterface(newCluster, 
Interfacetype.WRITE).getEndpoint())
                 || !ClusterHelper.getInterface(oldCluster, 
Interfacetype.WORKFLOW).getEndpoint()
                 .equals(ClusterHelper.getInterface(newCluster, 
Interfacetype.WORKFLOW).getEndpoint())) {
-            addLibsTo(newCluster);
+            addLibsTo(newCluster, getFilesystem(newCluster));
+            pushExtensionArtifactsToCluster(newCluster, 
getFilesystem(newCluster));
         }
     }
 
@@ -204,4 +265,16 @@ public class SharedLibraryHostingService implements 
ConfigurationChangeListener
             LOG.error(e.getMessage(), e);
         }
     }
+
+    private FileSystem getFilesystem(final Cluster cluster) throws 
FalconException {
+        FileSystem fs;
+        try {
+            LOG.info("Initializing FS: {} for cluster: {}", 
ClusterHelper.getStorageUrl(cluster), cluster.getName());
+            fs = 
HadoopClientFactory.get().createFalconFileSystem(ClusterHelper.getConfiguration(cluster));
+            fs.getStatus();
+            return fs;
+        } catch (Exception e) {
+            throw new FalconException("Failed to initialize FS for cluster : " 
+ cluster.getName(), e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d768c6d..a62c030 100644
--- a/pom.xml
+++ b/pom.xml
@@ -301,6 +301,7 @@
                                 <exclude>**/db1.properties</exclude>
                                 <exclude>**/db1.script</exclude>
                                 <exclude>**/credential_provider.jceks</exclude>
+                                <exclude>**/*.json</exclude>
                             </excludes>
                         </configuration>
                         <executions>
@@ -429,6 +430,7 @@
         <module>retention</module>
         <module>archival</module>
         <module>rerun</module>
+        <module>extensions</module>
         <module>prism</module>
         <module>unit</module>
         <module>lifecycle</module>

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/src/main/assemblies/distributed-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/distributed-package.xml 
b/src/main/assemblies/distributed-package.xml
index 2eff638..d58f40b 100644
--- a/src/main/assemblies/distributed-package.xml
+++ b/src/main/assemblies/distributed-package.xml
@@ -115,17 +115,72 @@
             <fileMode>0644</fileMode>
             <directoryMode>0755</directoryMode>
         </fileSet>
+        <fileSet>
+            
<directory>addons/extensions/hdfs-mirroring/src/main/META</directory>
+            <outputDirectory>extensions/hdfs-mirroring/META</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
 
         <fileSet>
-            
<directory>../addons/recipes/hdfs-replication/src/main/resources</directory>
-            <outputDirectory>data-mirroring/hdfs-replication</outputDirectory>
-            <fileMode>0644</fileMode>
+            <directory>./</directory>
+            
<outputDirectory>extensions/hdfs-mirroring/libs/build</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            
<outputDirectory>extensions/hdfs-mirroring/libs/runtime</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            
<directory>addons/extensions/hdfs-mirroring/src/main/resources</directory>
+            
<outputDirectory>extensions/hdfs-mirroring/resources</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            
<directory>addons/extensions/hive-mirroring/src/main/META</directory>
+            <outputDirectory>extensions/hive-mirroring/META</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
         </fileSet>
 
         <fileSet>
-            
<directory>../addons/recipes/hive-disaster-recovery/src/main/resources</directory>
-            
<outputDirectory>data-mirroring/hive-disaster-recovery</outputDirectory>
-        <fileMode>0644</fileMode>
+            <directory>./</directory>
+            <fileMode>0755</fileMode>
+            
<outputDirectory>extensions/hive-mirroring/libs/build</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            
<outputDirectory>extensions/hive-mirroring/libs/runtime</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            
<directory>addons/extensions/hive-mirroring/src/main/resources</directory>
+            
<outputDirectory>extensions/hive-mirroring/resources</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
         </fileSet>
 
         <fileSet>
@@ -181,6 +236,18 @@
             <outputDirectory>oozie/conf</outputDirectory>
             <fileMode>0644</fileMode>
         </file>
+
+        <file>
+            <source>../addons/extensions/hdfs-mirroring/README</source>
+            <outputDirectory>extensions/hdfs-mirroring</outputDirectory>
+            <fileMode>0755</fileMode>
+        </file>
+
+        <file>
+            <source>../addons/extensions/hive-mirroring/README</source>
+            <outputDirectory>extensions/hive-mirroring</outputDirectory>
+            <fileMode>0755</fileMode>
+        </file>
     </files>
 </assembly>
     

http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/src/main/assemblies/standalone-package.xml
----------------------------------------------------------------------
diff --git a/src/main/assemblies/standalone-package.xml 
b/src/main/assemblies/standalone-package.xml
index bef19ce..eadd972 100644
--- a/src/main/assemblies/standalone-package.xml
+++ b/src/main/assemblies/standalone-package.xml
@@ -121,15 +121,71 @@
         </fileSet>
 
         <fileSet>
-            
<directory>../addons/recipes/hdfs-replication/src/main/resources</directory>
-            <outputDirectory>data-mirroring/hdfs-replication</outputDirectory>
-            <fileMode>0644</fileMode>
+            
<directory>addons/extensions/hdfs-mirroring/src/main/META</directory>
+            <outputDirectory>extensions/hdfs-mirroring/META</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
         </fileSet>
 
         <fileSet>
-            
<directory>../addons/recipes/hive-disaster-recovery/src/main/resources</directory>
-            
<outputDirectory>data-mirroring/hive-disaster-recovery</outputDirectory>
-            <fileMode>0644</fileMode>
+            <directory>./</directory>
+            
<outputDirectory>extensions/hdfs-mirroring/libs/build</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            
<outputDirectory>extensions/hdfs-mirroring/libs/runtime</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            
<directory>addons/extensions/hdfs-mirroring/src/main/resources</directory>
+            
<outputDirectory>extensions/hdfs-mirroring/resources</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            
<directory>addons/extensions/hive-mirroring/src/main/META</directory>
+            <outputDirectory>extensions/hive-mirroring/META</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            <fileMode>0755</fileMode>
+            
<outputDirectory>extensions/hive-mirroring/libs/build</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            <directory>./</directory>
+            
<outputDirectory>extensions/hive-mirroring/libs/runtime</outputDirectory>
+            <excludes>
+                <exclude>*/**</exclude>
+            </excludes>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
+        </fileSet>
+
+        <fileSet>
+            
<directory>addons/extensions/hive-mirroring/src/main/resources</directory>
+            
<outputDirectory>extensions/hive-mirroring/resources</outputDirectory>
+            <fileMode>0755</fileMode>
+            <directoryMode>0755</directoryMode>
         </fileSet>
     </fileSets>
 
@@ -165,5 +221,17 @@
             <destName>falcon.war</destName>
             <fileMode>0644</fileMode>
         </file>
+
+        <file>
+            <source>../addons/extensions/hdfs-mirroring/README</source>
+            <outputDirectory>extensions/hdfs-mirroring</outputDirectory>
+            <fileMode>0755</fileMode>
+        </file>
+
+        <file>
+            <source>../addons/extensions/hive-mirroring/README</source>
+            <outputDirectory>extensions/hive-mirroring</outputDirectory>
+            <fileMode>0755</fileMode>
+        </file>
     </files>
 </assembly>

Reply via email to