Repository: falcon Updated Branches: refs/heads/master c52961c6a -> 95bf312f4
http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java new file mode 100644 index 0000000..49b3a12 --- /dev/null +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtension.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.extensions.mirroring.hive; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.catalog.AbstractCatalogService; +import org.apache.falcon.catalog.CatalogServiceFactory; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.extensions.AbstractExtension; +import org.apache.falcon.extensions.ExtensionProperties; +import org.apache.falcon.security.SecurityUtil; +import org.apache.hadoop.conf.Configuration; + +import java.util.Properties; + +/** + * Hive mirroring extension. + */ +public class HiveMirroringExtension extends AbstractExtension { + private static final String EXTENSION_NAME = "HIVE-MIRRORING"; + private static final String ALL_TABLES = "*"; + private static final String COMMA_DELIMITER = ","; + private static final String SECURE_RESOURCE = "-secure"; + + @Override + public String getName() { + return EXTENSION_NAME; + } + + @Override + public void validate(final Properties extensionProperties) throws FalconException { + for (HiveMirroringExtensionProperties property : HiveMirroringExtensionProperties.values()) { + if (extensionProperties.getProperty(property.getName()) == null && property.isRequired()) { + throw new FalconException("Missing extension property: " + property.getName()); + } + } + + Cluster srcCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName()); + if (srcCluster == null) { + throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName() + + " not found"); + } + String srcClusterCatalogUrl = ClusterHelper.getRegistryEndPoint(srcCluster); + Configuration srcClusterConf = ClusterHelper.getConfiguration(srcCluster); + + // Validate if DB exists - source and target + String sourceDbList = extensionProperties.getProperty( + HiveMirroringExtensionProperties.SOURCE_DATABASES.getName()); + + if (StringUtils.isBlank(sourceDbList)) { + throw new FalconException("No source DB specified for Hive mirroring"); + } + + AbstractCatalogService catalogService = CatalogServiceFactory.getCatalogService(); + String[] srcDbs = sourceDbList.split(COMMA_DELIMITER); + if (srcDbs.length <= 0) { + throw new FalconException("No source DB specified for Hive mirroring"); + } + for (String db : srcDbs) { + if (!catalogService.dbExists(srcClusterConf, srcClusterCatalogUrl, db)) { + throw new FalconException("Database " + db + " doesn't exist on cluster" + srcCluster.getName()); + } + } + + String sourceTableList = extensionProperties.getProperty( + HiveMirroringExtensionProperties.SOURCE_TABLES.getName()); + if (StringUtils.isNotBlank(sourceTableList)) { + if (!sourceTableList.equals(ALL_TABLES)) { + String db = srcDbs[0]; + String[] srcTables = sourceTableList.split(COMMA_DELIMITER); + for (String table : srcTables) { + if (!catalogService.tableExists(srcClusterConf, srcClusterCatalogUrl, db, table)) { + throw new FalconException("Table " + table + " doesn't exist on cluster" + + srcCluster.getName()); + } + } + } + } + + // Verify db exists on target + Cluster targetCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.TARGET_CLUSTER.getName()); + if (targetCluster == null) { + throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.TARGET_CLUSTER.getName() + + " not found"); + } + String targetClusterCatalogUrl = ClusterHelper.getRegistryEndPoint(targetCluster); + Configuration targetClusterConf = ClusterHelper.getConfiguration(targetCluster); + + for (String db : srcDbs) { + if (!catalogService.dbExists(targetClusterConf, targetClusterCatalogUrl, db)) { + throw new FalconException("Database " + db + " doesn't exist on cluster" + targetCluster.getName()); + } + } + } + + @Override + public Properties getAdditionalProperties(final Properties extensionProperties) throws FalconException { + Properties additionalProperties = new Properties(); + + String jobName = extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName()); + // Add job name as Hive DR job + additionalProperties.put(HiveMirroringExtensionProperties.HIVE_MIRRORING_JOB_NAME.getName(), + jobName + System.currentTimeMillis()); + + // Add required properties of cluster where job should run + additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN.getName(), + extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName())); + Cluster jobCluster = ClusterHelper.getCluster(ExtensionProperties.CLUSTER_NAME.getName()); + if (jobCluster == null) { + throw new FalconException("Cluster entity " + ExtensionProperties.CLUSTER_NAME.getName() + + " not found"); + } + additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_RUN_WRITE_EP.getName(), + ClusterHelper.getStorageUrl(jobCluster)); + if (SecurityUtil.isSecurityEnabled()) { + // Add -secure and update the resource name + String resourceName = getName().toLowerCase() + SECURE_RESOURCE; + additionalProperties.put(ExtensionProperties.RESOURCE_NAME.getName(), resourceName); + additionalProperties.put(HiveMirroringExtensionProperties.CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL.getName(), + ClusterHelper.getPropertyValue(jobCluster, SecurityUtil.NN_PRINCIPAL)); + } + + // Properties for src cluster + Cluster srcCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName()); + if (srcCluster == null) { + throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.SOURCE_CLUSTER.getName() + + " not found"); + } + additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_METASTORE_URI.getName(), + ClusterHelper.getRegistryEndPoint(srcCluster)); + additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_NN.getName(), + ClusterHelper.getStorageUrl(srcCluster)); + + String sourceTableList = extensionProperties.getProperty( + HiveMirroringExtensionProperties.SOURCE_TABLES.getName()); + if (StringUtils.isBlank(sourceTableList)) { + additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_TABLES.getName(), ALL_TABLES); + } + + if (SecurityUtil.isSecurityEnabled()) { + String hive2Principal = extensionProperties.getProperty(HiveMirroringExtensionProperties + .SOURCE_HIVE2_KERBEROS_PRINCIPAL.getName()); + if (StringUtils.isBlank(hive2Principal)) { + throw new FalconException("Hive server2 kerberos principal for cluster " + srcCluster.getName() + + "not passed for extension " + jobName); + } + + additionalProperties.put(HiveMirroringExtensionProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(), + ClusterHelper.getPropertyValue(srcCluster, SecurityUtil.NN_PRINCIPAL)); + additionalProperties.put( + HiveMirroringExtensionProperties.SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName(), + ClusterHelper.getPropertyValue(srcCluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL)); + } + + // Properties for target cluster + Cluster targetCluster = ClusterHelper.getCluster(HiveMirroringExtensionProperties.TARGET_CLUSTER.getName()); + if (targetCluster == null) { + throw new FalconException("Cluster entity " + HiveMirroringExtensionProperties.TARGET_CLUSTER.getName() + + " not found"); + } + additionalProperties.put(HiveMirroringExtensionProperties.TARGET_METASTORE_URI.getName(), + ClusterHelper.getRegistryEndPoint(targetCluster)); + additionalProperties.put(HiveMirroringExtensionProperties.TARGET_NN.getName(), + ClusterHelper.getStorageUrl(targetCluster)); + + if (SecurityUtil.isSecurityEnabled()) { + String hive2Principal = extensionProperties.getProperty(HiveMirroringExtensionProperties + .TARGET_HIVE2_KERBEROS_PRINCIPAL.getName()); + if (StringUtils.isBlank(hive2Principal)) { + throw new FalconException("Hive server2 kerberos principal for cluster " + targetCluster.getName() + + "not passed for extension " + jobName); + } + + additionalProperties.put(HiveMirroringExtensionProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(), + ClusterHelper.getPropertyValue(targetCluster, SecurityUtil.NN_PRINCIPAL)); + additionalProperties.put( + HiveMirroringExtensionProperties.TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL.getName(), + ClusterHelper.getPropertyValue(targetCluster, SecurityUtil.HIVE_METASTORE_KERBEROS_PRINCIPAL)); + } + + // Misc properties + // Add default properties if not passed + String maxEvents = extensionProperties.getProperty(HiveMirroringExtensionProperties.MAX_EVENTS.getName()); + if (StringUtils.isBlank(maxEvents)) { + additionalProperties.put(HiveMirroringExtensionProperties.MAX_EVENTS.getName(), "-1"); + } + + String replicationMaxMaps = + extensionProperties.getProperty(HiveMirroringExtensionProperties.MAX_MAPS.getName()); + if (StringUtils.isBlank(replicationMaxMaps)) { + additionalProperties.put(HiveMirroringExtensionProperties.MAX_MAPS.getName(), "5"); + } + + String distcpMaxMaps = extensionProperties.getProperty( + HiveMirroringExtensionProperties.DISTCP_MAX_MAPS.getName()); + if (StringUtils.isBlank(distcpMaxMaps)) { + additionalProperties.put(HiveMirroringExtensionProperties.DISTCP_MAX_MAPS.getName(), "1"); + } + + String distcpMapBandwidth = extensionProperties.getProperty( + HiveMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName()); + if (StringUtils.isBlank(distcpMapBandwidth)) { + additionalProperties.put(HiveMirroringExtensionProperties.MAP_BANDWIDTH_IN_MB.getName(), "100"); + } + + if (StringUtils.isBlank( + extensionProperties.getProperty(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName()))) { + additionalProperties.put(HiveMirroringExtensionProperties.TDE_ENCRYPTION_ENABLED.getName(), "false"); + } + + return additionalProperties; + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java new file mode 100644 index 0000000..6c4f58d --- /dev/null +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hive/HiveMirroringExtensionProperties.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.extensions.mirroring.hive; + +/** + * Hive mirroring extension properties. + */ + +public enum HiveMirroringExtensionProperties { + SOURCE_CLUSTER("sourceCluster", "Replication source cluster name"), + SOURCE_METASTORE_URI("sourceMetastoreUri", "Source Hive metastore uri", false), + SOURCE_HS2_URI("sourceHiveServer2Uri", "Source HS2 uri"), + SOURCE_DATABASES("sourceDatabases", "List of databases to replicate"), + SOURCE_TABLES("sourceTables", "List of tables to replicate", false), + SOURCE_STAGING_PATH("sourceStagingPath", "Location of source staging path", false), + SOURCE_NN("sourceNN", "Source name node", false), + SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", "Source name node kerberos principal", false), + SOURCE_HIVE_METASTORE_KERBEROS_PRINCIPAL("sourceHiveMetastoreKerberosPrincipal", + "Source hive metastore kerberos principal", false), + SOURCE_HIVE2_KERBEROS_PRINCIPAL("sourceHive2KerberosPrincipal", + "Source hiveserver2 kerberos principal", false), + + TARGET_CLUSTER("targetCluster", "Target cluster name"), + TARGET_METASTORE_URI("targetMetastoreUri", "Target Hive metastore uri", false), + TARGET_HS2_URI("targetHiveServer2Uri", "Target HS2 uri"), + TARGET_STAGING_PATH("targetStagingPath", "Location of target staging path", false), + TARGET_NN("targetNN", "Target name node", false), + TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", "Target name node kerberos principal", false), + TARGET_HIVE_METASTORE_KERBEROS_PRINCIPAL("targetHiveMetastoreKerberosPrincipal", + "Target hive metastore kerberos principal", false), + TARGET_HIVE2_KERBEROS_PRINCIPAL("targetHive2KerberosPrincipal", + "Target hiveserver2 kerberos principal", false), + + MAX_EVENTS("maxEvents", "Maximum events to replicate", false), + MAX_MAPS("replicationMaxMaps", "Maximum number of maps used during replication", false), + DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp", false), + MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication"), + CLUSTER_FOR_JOB_RUN("clusterForJobRun", "Cluster on which replication job runs", false), + CLUSTER_FOR_JOB_NN_KERBEROS_PRINCIPAL("Job cluster kerberos principal", + "Write EP of cluster on which replication job runs", false), + CLUSTER_FOR_JOB_RUN_WRITE_EP("clusterForJobRunWriteEP", "Write EP of cluster on which replication job runs", false), + TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Set to true if TDE encryption is enabled", false), + HIVE_MIRRORING_JOB_NAME("jobName", "Unique hive replication job name", false); + + private final String name; + private final String description; + private final boolean isRequired; + + HiveMirroringExtensionProperties(String name, String description) { + this(name, description, true); + } + + HiveMirroringExtensionProperties(String name, String description, boolean isRequired) { + this.name = name; + this.description = description; + this.isRequired = isRequired; + } + + public String getName() { + return this.name; + } + + public String getDescription() { + return description; + } + + public boolean isRequired() { + return isRequired; + } + + @Override + public String toString() { + return getName(); + } +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java new file mode 100644 index 0000000..9e07112 --- /dev/null +++ b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java @@ -0,0 +1,215 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.extensions.store; + +import org.apache.commons.lang.StringUtils; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.RemoteIterator; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; + +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.entity.store.StoreAccessException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.HashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Store for Falcon extensions. + */ +public final class ExtensionStore { + + private static final Logger LOG = LoggerFactory.getLogger(ExtensionStore.class); + private FileSystem fs; + + private Path storePath; + + // Convention over configuration design paradigm + private static final String RESOURCES_DIR= "resources"; + private static final String LIBS_DIR= "libs"; + + private static final String EXTENSION_STORE_URI = "extension.store.uri"; + + private static final ExtensionStore STORE = new ExtensionStore(); + + public static ExtensionStore get() { + return STORE; + } + + private ExtensionStore() { + String uri = StartupProperties.get().getProperty(EXTENSION_STORE_URI); + if (StringUtils.isEmpty(uri)) { + throw new RuntimeException("Property extension.store.uri not set in startup properties." + + "Please set it to path of extension deployment on HDFS. Extension store init failed"); + } + storePath = new Path(uri); + fs = initializeFileSystem(); + } + + private FileSystem initializeFileSystem() { + try { + FileSystem fileSystem = + HadoopClientFactory.get().createFalconFileSystem(storePath.toUri()); + if (!fileSystem.exists(storePath)) { + LOG.info("Creating extension store directory: {}", storePath); + // set permissions so config store dir is owned by falcon alone + HadoopClientFactory.mkdirs(fileSystem, storePath, HadoopClientFactory.ALL_PERMISSION); + } + + return fileSystem; + } catch (Exception e) { + throw new RuntimeException("Unable to bring up extension store for path: " + storePath, e); + } + } + + public Map<String, String> getExtensionArtifacts(final String extensionName) throws StoreAccessException { + Map<String, String> extensionFileMap = new HashMap<>(); + try { + Path extensionPath = new Path(storePath, extensionName.toLowerCase()); + RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(extensionPath, true); + + if (!fileStatusListIterator.hasNext()) { + throw new StoreAccessException(new Exception(" For extension " + extensionName + + " there are no artifacts at the extension store path " + storePath)); + } + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + Path filePath = Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()); + extensionFileMap.put(filePath.getName(), filePath.toString()); + } + } catch (IOException e) { + throw new StoreAccessException(e); + } + return extensionFileMap; + } + + public Map<String, String> getExtensionResources(final String extensionName) throws StoreAccessException { + Map<String, String> extensionFileMap = new HashMap<>(); + try { + Path extensionPath = new Path(storePath, extensionName.toLowerCase()); + + Path resourcesPath = null; + FileStatus[] files = fs.listStatus(extensionPath); + + for (FileStatus fileStatus : files) { + if (fileStatus.getPath().getName().equalsIgnoreCase(RESOURCES_DIR)) { + resourcesPath = fileStatus.getPath(); + break; + } + } + + if (resourcesPath == null) { + throw new StoreAccessException(new Exception(" For extension " + extensionName + + " there is no " + RESOURCES_DIR + "at the extension store path " + storePath)); + } + RemoteIterator<LocatedFileStatus> fileStatusListIterator = fs.listFiles(resourcesPath, true); + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus fileStatus = fileStatusListIterator.next(); + Path filePath = Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()); + extensionFileMap.put(filePath.getName(), filePath.toString()); + } + } catch (IOException e) { + throw new StoreAccessException(e); + } + return extensionFileMap; + } + + public String getExtensionLibPath(final String extensionName) throws StoreAccessException { + try { + Path extensionPath = new Path(storePath, extensionName.toLowerCase()); + + Path libsPath = null; + FileStatus[] files = fs.listStatus(extensionPath); + + for (FileStatus fileStatus : files) { + if (fileStatus.getPath().getName().equalsIgnoreCase(LIBS_DIR)) { + if (fileStatus.getLen() != 0) { + libsPath = Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()); + } + break; + } + } + + if (libsPath == null) { + LOG.info("For extension " + extensionName + " there is no " + + LIBS_DIR + "at the extension store path " + storePath); + return null; + } else { + return libsPath.toString(); + } + } catch (IOException e) { + throw new StoreAccessException(e); + } + } + + public String getExtensionResource(final String resourcePath) throws StoreAccessException { + if (StringUtils.isBlank(resourcePath)) { + throw new StoreAccessException(new Exception("Resource path cannot be null or empty")); + } + + try { + Path resourceFile = new Path(resourcePath); + + ByteArrayOutputStream writer = new ByteArrayOutputStream(); + InputStream data = fs.open(resourceFile); + IOUtils.copyBytes(data, writer, fs.getConf(), true); + return writer.toString(); + } catch (IOException e) { + throw new StoreAccessException(e); + } + } + + public List<String> getExtensions() throws StoreAccessException { + List<String> extesnionList = new ArrayList<>(); + try { + FileStatus[] fileStatuses = fs.listStatus(storePath); + + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isDirectory()) { + Path filePath = Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath()); + extesnionList.add(filePath.getName()); + } + } + } catch (IOException e) { + throw new StoreAccessException(e); + } + return extesnionList; + } + + public Path getExtensionStorePath() { + return storePath; + } + + public boolean isExtensionStoreInitialized() { + return (storePath != null); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java new file mode 100644 index 0000000..92e9805 --- /dev/null +++ b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java @@ -0,0 +1,309 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.extensions.util; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.Frequency; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.process.ACL; +import org.apache.falcon.entity.v0.process.Cluster; +import org.apache.falcon.entity.v0.process.EngineType; +import org.apache.falcon.entity.v0.process.Notification; +import org.apache.falcon.entity.v0.process.PolicyType; +import org.apache.falcon.entity.v0.process.Property; +import org.apache.falcon.entity.v0.process.Retry; +import org.apache.falcon.entity.v0.process.Workflow; +import org.apache.falcon.extensions.ExtensionProperties; +import org.apache.falcon.security.SecurityUtil; +import org.apache.falcon.util.NotificationType; + +import javax.xml.bind.Unmarshaller; +import javax.xml.bind.ValidationEvent; +import javax.xml.bind.ValidationEventHandler; +import java.io.StringReader; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.TimeZone; + + +/** + * Extension builder utility. + */ +public final class ExtensionProcessBuilderUtils { + + private static final Pattern EXTENSION_VAR_PATTERN = Pattern.compile("##[A-Za-z0-9_.]*##"); + + private ExtensionProcessBuilderUtils() { + } + + public static Entity createProcessFromTemplate(final String processTemplate, + final String extensionName, + final Properties extensionProperties, + final String wfPath, + final String wfLibPath) throws FalconException { + if (StringUtils.isBlank(processTemplate) || StringUtils.isBlank(extensionName) + || extensionProperties == null || StringUtils.isBlank(wfPath)) { + throw new FalconException("Invalid arguments passed to extension builder"); + } + org.apache.falcon.entity.v0.process.Process process = bindAttributesInTemplate( + processTemplate, extensionProperties, extensionName, wfPath, wfLibPath); + + validateGeneratedProcess(process.toString()); + return process; + } + + private static org.apache.falcon.entity.v0.process.Process + bindAttributesInTemplate(final String processTemplate, final Properties extensionProperties, + final String extensionName, final String wfPath, + final String wfLibPath) + throws FalconException { + if (StringUtils.isBlank(processTemplate) || extensionProperties == null) { + throw new FalconException("Process template or properties cannot be null"); + } + + org.apache.falcon.entity.v0.process.Process process; + try { + Unmarshaller unmarshaller = EntityType.PROCESS.getUnmarshaller(); + // Validation can be skipped for unmarshalling as we want to bind template with the properties. + // Vaildation is handled as part of marshalling + unmarshaller.setSchema(null); + unmarshaller.setEventHandler(new ValidationEventHandler() { + public boolean handleEvent(ValidationEvent validationEvent) { + return true; + } + } + ); + process = (org.apache.falcon.entity.v0.process.Process) + unmarshaller.unmarshal(new StringReader(processTemplate)); + } catch (Exception e) { + throw new FalconException(e); + } + + /* For optional properties user might directly set them in the process xml and might not set it in properties + file. Before doing the submission validation is done to confirm process xml doesn't have + EXTENSION_VAR_PATTERN + */ + + String processName = extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName()); + if (StringUtils.isNotEmpty(processName)) { + process.setName(processName); + } + + // DR process template has only one cluster + bindClusterProperties(process.getClusters().getClusters().get(0), extensionProperties); + + // bind scheduling properties + String processFrequency = extensionProperties.getProperty(ExtensionProperties.FREQUENCY.getName()); + if (StringUtils.isNotEmpty(processFrequency)) { + process.setFrequency(Frequency.fromString(processFrequency)); + } + + String zone = extensionProperties.getProperty(ExtensionProperties.TIMEZONE.getName()); + if (StringUtils.isNotBlank(zone)) { + process.setTimezone(TimeZone.getTimeZone(zone)); + } else { + process.setTimezone(TimeZone.getTimeZone("UTC")); + } + + bindWorkflowProperties(process.getWorkflow(), extensionName, wfPath, wfLibPath); + bindRetryProperties(process.getRetry(), extensionProperties); + bindNotificationProperties(process.getNotification(), extensionProperties); + bindACLProperties(process.getACL(), extensionProperties); + bindTagsProperties(process, extensionProperties); + bindCustomProperties(process.getProperties(), extensionProperties); + + return process; + } + + private static void bindClusterProperties(final Cluster cluster, + final Properties extensionProperties) { + String clusterName = extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName()); + if (StringUtils.isNotEmpty(clusterName)) { + cluster.setName(clusterName); + } + String clusterStartValidity = extensionProperties.getProperty(ExtensionProperties.VALIDITY_START.getName()); + if (StringUtils.isNotEmpty(clusterStartValidity)) { + cluster.getValidity().setStart(SchemaHelper.parseDateUTC(clusterStartValidity)); + } + + String clusterEndValidity = extensionProperties.getProperty(ExtensionProperties.VALIDITY_END.getName()); + if (StringUtils.isNotEmpty(clusterEndValidity)) { + cluster.getValidity().setEnd(SchemaHelper.parseDateUTC(clusterEndValidity)); + } + } + + private static void bindWorkflowProperties(final Workflow wf, + final String extensionName, + final String wfPath, + final String wfLibPath) { + final EngineType defaultEngineType = EngineType.OOZIE; + final String workflowNameSuffix = "-workflow"; + + wf.setName(extensionName + workflowNameSuffix); + wf.setEngine(defaultEngineType); + wf.setPath(wfPath); + if (StringUtils.isNotEmpty(wfLibPath)) { + wf.setLib(wfLibPath); + } else { + wf.setLib(""); + } + } + + private static void bindRetryProperties(final Retry processRetry, + final Properties extensionProperties) { + final PolicyType defaultRetryPolicy = PolicyType.PERIODIC; + final int defaultRetryAttempts = 3; + final Frequency defaultRetryDelay = new Frequency("minutes(30)"); + + String retryPolicy = extensionProperties.getProperty(ExtensionProperties.RETRY_POLICY.getName()); + if (StringUtils.isNotBlank(retryPolicy)) { + processRetry.setPolicy(PolicyType.fromValue(retryPolicy)); + } else { + processRetry.setPolicy(defaultRetryPolicy); + } + + String retryAttempts = extensionProperties.getProperty(ExtensionProperties.RETRY_ATTEMPTS.getName()); + if (StringUtils.isNotBlank(retryAttempts)) { + processRetry.setAttempts(Integer.parseInt(retryAttempts)); + } else { + processRetry.setAttempts(defaultRetryAttempts); + } + + String retryDelay = extensionProperties.getProperty(ExtensionProperties.RETRY_DELAY.getName()); + if (StringUtils.isNotBlank(retryDelay)) { + processRetry.setDelay(Frequency.fromString(retryDelay)); + } else { + processRetry.setDelay(defaultRetryDelay); + } + + String retryOnTimeout = extensionProperties.getProperty(ExtensionProperties.RETRY_ON_TIMEOUT.getName()); + if (StringUtils.isNotBlank(retryOnTimeout)) { + processRetry.setOnTimeout(Boolean.valueOf(retryOnTimeout)); + } else { + processRetry.setOnTimeout(false); + } + } + + private static void bindNotificationProperties(final Notification processNotification, + final Properties extensionProperties) { + final String defaultNotificationType = NotificationType.EMAIL.getName(); + + String notificationType = extensionProperties.getProperty( + ExtensionProperties.JOB_NOTIFICATION_TYPE.getName()); + if (StringUtils.isNotBlank(notificationType)) { + processNotification.setType(notificationType); + } else { + processNotification.setType(defaultNotificationType); + } + + String notificationAddress = extensionProperties.getProperty( + ExtensionProperties.JOB_NOTIFICATION_ADDRESS.getName()); + if (StringUtils.isNotBlank(notificationAddress)) { + processNotification.setTo(notificationAddress); + } else { + processNotification.setTo("NA"); + } + } + + private static void bindACLProperties(final ACL acl, + final Properties extensionProperties) throws FalconException { + if (!SecurityUtil.isAuthorizationEnabled()) { + return; + } + + String aclowner = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_OWNER.getName()); + if (StringUtils.isNotEmpty(aclowner)) { + acl.setOwner(aclowner); + } else { + throw new FalconException("ACL owner extension property cannot be null or empty when authorization is " + + "enabled"); + } + + String aclGroup = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_GROUP.getName()); + if (StringUtils.isNotEmpty(aclGroup)) { + acl.setGroup(aclGroup); + } else { + throw new FalconException("ACL group extension property cannot be null or empty when authorization is " + + "enabled"); + } + + String aclPermission = extensionProperties.getProperty(ExtensionProperties.JOB_ACL_PERMISSION.getName()); + if (StringUtils.isNotEmpty(aclPermission)) { + acl.setPermission(aclPermission); + } else { + throw new FalconException("ACL permission extension property cannot be null or empty when authorization is " + + "enabled"); + } + } + + private static void bindTagsProperties(final org.apache.falcon.entity.v0.process.Process process, + final Properties extensionProperties) { + String falconSystemTags = process.getTags(); + String tags = extensionProperties.getProperty(ExtensionProperties.JOB_TAGS.getName()); + if (StringUtils.isNotEmpty(tags)) { + if (StringUtils.isNotEmpty(falconSystemTags)) { + tags += ", " + falconSystemTags; + } + process.setTags(tags); + } + } + + + private static void bindCustomProperties(final org.apache.falcon.entity.v0.process.Properties customProperties, + final Properties extensionProperties) { + List<Property> propertyList = new ArrayList<>(); + + for (Map.Entry<Object, Object> extensionProperty : extensionProperties.entrySet()) { + if (ExtensionProperties.getOptionsMap().get(extensionProperty.getKey().toString()) == null) { + addProperty(propertyList, (String) extensionProperty.getKey(), (String) extensionProperty.getValue()); + } + } + + customProperties.getProperties().addAll(propertyList); + } + + private static void addProperty(List<Property> propertyList, String name, String value) { + Property prop = new Property(); + prop.setName(name); + prop.setValue(value); + propertyList.add(prop); + } + + private static void validateGeneratedProcess(final String generatedProcess) throws FalconException { + if (StringUtils.isBlank(generatedProcess)) { + throw new IllegalArgumentException("Invalid arguments passed"); + } + + Matcher matcher = EXTENSION_VAR_PATTERN.matcher(generatedProcess); + if (matcher.find()) { + String variable = generatedProcess.substring(matcher.start(), matcher.end()); + throw new FalconException("Match not found for the template: " + variable + + " in extension template file. Please add it in extension properties file"); + } + } + +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java new file mode 100644 index 0000000..c8df2c0 --- /dev/null +++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionServiceTest.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.extensions; + +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import org.testng.Assert; + +/** + * Unit tests for ExtensionService. + */ +public class ExtensionServiceTest { + + private ExtensionService service; + + @BeforeClass + public void setUp() throws Exception { + service = new ExtensionService(); + service.init(); + } + + @AfterClass + public void tearDown() throws Exception { + service.destroy(); + } + + @Test + public void testGetName() throws Exception { + Assert.assertEquals(service.getName(), ExtensionService.SERVICE_NAME); + } + + @Test + public void testGetextensionStore() throws Exception { + Assert.assertNotNull(ExtensionService.getExtensionStore()); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java new file mode 100644 index 0000000..ffd9336 --- /dev/null +++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.extensions; + +import junit.framework.Assert; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.EntityUtil; +import org.apache.falcon.entity.parser.EntityParserFactory; +import org.apache.falcon.entity.parser.ProcessEntityParser; +import org.apache.falcon.entity.store.ConfigurationStore; +import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.EntityType; +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.entity.v0.process.EngineType; +import org.apache.falcon.entity.v0.process.PolicyType; +import org.apache.falcon.entity.v0.process.Process; +import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension; +import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtensionProperties; +import org.apache.falcon.extensions.store.AbstractTestExtensionStore; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; +import java.util.Properties; + +/** + * Tests for Extension. + */ +public class ExtensionTest extends AbstractTestExtensionStore { + private static final String PRIMARY_CLUSTER_XML = "/primary-cluster-0.1.xml"; + private static final String BACKUP_CLUSTER_XML = "/backup-cluster-0.1.xml"; + private static final String JOB_NAME = "hdfs-mirroring-monthly"; + private static final String JOB_CLUSTER_NAME = "primaryCluster"; + private static final String VALIDITY_START = "2016-01-02T00:00Z"; + private static final String VALIDITY_END = "2018-01-02T00:00Z"; + private static final String FREQUENCY = "days(1)"; + private static final String SOURCEDIR = "/users/source/file1"; + private static final String SOURCE_CLUSTER = "primaryCluster"; + private static final String TARGETDIR = "/users/target/file1"; + private static final String TARGET_CLUSTER = "backupCluster"; + private Extension extension; + + private static Properties getHdfsProperties() { + Properties properties = new Properties(); + properties.setProperty(ExtensionProperties.JOB_NAME.getName(), + JOB_NAME); + properties.setProperty(ExtensionProperties.CLUSTER_NAME.getName(), + JOB_CLUSTER_NAME); + properties.setProperty(ExtensionProperties.VALIDITY_START.getName(), + VALIDITY_START); + properties.setProperty(ExtensionProperties.VALIDITY_END.getName(), + VALIDITY_END); + properties.setProperty(ExtensionProperties.FREQUENCY.getName(), + FREQUENCY); + properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(), + SOURCEDIR); + properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName(), + SOURCE_CLUSTER); + properties.setProperty(HdfsMirroringExtensionProperties.TARGET_DIR.getName(), + TARGETDIR); + properties.setProperty(HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName(), + TARGET_CLUSTER); + + return properties; + } + + @BeforeClass + public void init() throws Exception { + extension = new Extension(); + initClusters(); + } + + private void initClusters() throws Exception { + InputStream inputStream = getClass().getResourceAsStream(PRIMARY_CLUSTER_XML); + Cluster primaryCluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(inputStream); + ConfigurationStore.get().publish(EntityType.CLUSTER, primaryCluster); + + inputStream = getClass().getResourceAsStream(BACKUP_CLUSTER_XML); + Cluster backupCluster = (Cluster) EntityType.CLUSTER.getUnmarshaller().unmarshal(inputStream); + ConfigurationStore.get().publish(EntityType.CLUSTER, backupCluster); + } + + @Test + public void testGetExtensionEntitiesForHdfsMirroring() throws FalconException { + ProcessEntityParser parser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS); + + List<Entity> entities = extension.getEntities(new HdfsMirroringExtension().getName(), getHdfsProperties()); + if (entities == null || entities.isEmpty()) { + Assert.fail("Entities returned cannot be null or empty"); + } + + Assert.assertEquals(1, entities.size()); + Entity entity = entities.get(0); + + + Assert.assertEquals(EntityType.PROCESS, entity.getEntityType()); + parser.parse(new ByteArrayInputStream(entity.toString().getBytes())); + + // Validate + Process processEntity = (Process) entity; + Assert.assertEquals(JOB_NAME, processEntity.getName()); + org.apache.falcon.entity.v0.process.Cluster jobCluster = processEntity.getClusters(). + getClusters().get(0); + Assert.assertEquals(JOB_CLUSTER_NAME, jobCluster.getName()); + Assert.assertEquals(VALIDITY_START, SchemaHelper.formatDateUTC(jobCluster.getValidity().getStart())); + Assert.assertEquals(VALIDITY_END, SchemaHelper.formatDateUTC(jobCluster.getValidity().getEnd())); + + Assert.assertEquals(FREQUENCY, processEntity.getFrequency().toString()); + Assert.assertEquals("UTC", processEntity.getTimezone().getID()); + + Assert.assertEquals(EngineType.OOZIE, processEntity.getWorkflow().getEngine()); + Assert.assertEquals(extensionStorePath + "/hdfs-mirroring/libs", + processEntity.getWorkflow().getLib()); + Assert.assertEquals(extensionStorePath + + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml", + processEntity.getWorkflow().getPath()); + + Properties props = EntityUtil.getEntityProperties(processEntity); + + String srcClusterEndPoint = ClusterHelper.getReadOnlyStorageUrl(ClusterHelper.getCluster(SOURCE_CLUSTER)); + Assert.assertEquals(srcClusterEndPoint + SOURCEDIR, props.getProperty("sourceDir")); + Assert.assertEquals(SOURCE_CLUSTER, props.getProperty("sourceCluster")); + Assert.assertEquals(TARGETDIR, props.getProperty("targetDir")); + Assert.assertEquals(TARGET_CLUSTER, props.getProperty("targetCluster")); + + //retry + Assert.assertEquals(3, processEntity.getRetry().getAttempts()); + Assert.assertEquals(PolicyType.PERIODIC, processEntity.getRetry().getPolicy()); + Assert.assertEquals("minutes(30)", processEntity.getRetry().getDelay().toString()); + } + + @Test(expectedExceptions = FalconException.class, + expectedExceptionsMessageRegExp = "Missing extension property: jobClusterName") + public void testGetExtensionEntitiesForHdfsMirroringMissingMandatoryProperties() throws FalconException { + Properties props = getHdfsProperties(); + props.remove(ExtensionProperties.CLUSTER_NAME.getName()); + + extension.getEntities(new HdfsMirroringExtension().getName(), props); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java b/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java new file mode 100644 index 0000000..b62b475 --- /dev/null +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/AbstractTestExtensionStore.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.extensions.store; + +import org.apache.commons.io.FileUtils; +import org.apache.falcon.extensions.AbstractExtension; +import org.apache.falcon.extensions.ExtensionService; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.util.StartupProperties; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; + +import java.io.File; +import java.io.IOException; +import java.net.URI; +import java.util.List; + +/** + * Abstract class to setup extension store. +*/ +public class AbstractTestExtensionStore { + + protected String extensionStorePath; + protected ExtensionStore store; + private FileSystem fileSystem; + + @BeforeClass + public void initConfigStore() throws Exception { + String configPath = new URI(StartupProperties.get().getProperty("extension.store.uri")).getPath(); + extensionStorePath = configPath + "-" + getClass().getName(); + StartupProperties.get().setProperty("extension.store.uri", extensionStorePath); + new ExtensionService().init(); + store = ExtensionService.getExtensionStore(); + fileSystem = HadoopClientFactory.get().createFalconFileSystem(new Configuration(true)); + + extensionStoreSetup(); + } + + private void extensionStoreSetup() throws IOException { + List<AbstractExtension> extensions = AbstractExtension.getExtensions(); + for (AbstractExtension extension : extensions) { + String extensionName = extension.getName().toLowerCase(); + Path extensionPath = new Path(extensionStorePath, extensionName); + Path libPath = new Path(extensionPath, "libs"); + Path resourcesPath = new Path(extensionPath, "resources"); + HadoopClientFactory.mkdirs(fileSystem, extensionPath, + HadoopClientFactory.READ_EXECUTE_PERMISSION); + HadoopClientFactory.mkdirs(fileSystem, new Path(extensionPath, new Path("README")), + HadoopClientFactory.READ_EXECUTE_PERMISSION); + + HadoopClientFactory.mkdirs(fileSystem, libPath, + HadoopClientFactory.READ_EXECUTE_PERMISSION); + HadoopClientFactory.mkdirs(fileSystem, new Path(libPath, "build"), + HadoopClientFactory.READ_EXECUTE_PERMISSION); + HadoopClientFactory.mkdirs(fileSystem, new Path(libPath, "runtime"), + HadoopClientFactory.READ_EXECUTE_PERMISSION); + + HadoopClientFactory.mkdirs(fileSystem, resourcesPath, + HadoopClientFactory.READ_EXECUTE_PERMISSION); + HadoopClientFactory.mkdirs(fileSystem, new Path(resourcesPath, "build"), + HadoopClientFactory.READ_EXECUTE_PERMISSION); + Path runTimeResourcePath = new Path(resourcesPath, "runtime"); + HadoopClientFactory.mkdirs(fileSystem, runTimeResourcePath, + HadoopClientFactory.READ_EXECUTE_PERMISSION); + + fileSystem.create(new Path(runTimeResourcePath, extensionName + "-workflow.xml")); + Path dstFile = new Path(runTimeResourcePath, extensionName + "-template.xml"); + fileSystem.create(dstFile); + String srcFile = extensionName + "-template.xml"; + fileSystem.copyFromLocalFile(new Path(getAbsolutePath(srcFile)), dstFile); + } + + } + + private String getAbsolutePath(String fileName) { + return this.getClass().getResource("/" + fileName).getPath(); + } + + + @AfterClass + public void cleanUp() throws Exception { + FileUtils.deleteDirectory(new File(extensionStorePath)); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java new file mode 100644 index 0000000..3462321 --- /dev/null +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.extensions.store; + +import com.google.common.collect.ImmutableMap; +import org.apache.falcon.entity.store.StoreAccessException; +import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.util.Map; + +/** + * Tests for extension store. + */ +public class ExtensionStoreTest extends AbstractTestExtensionStore { + private static Map<String, String> resourcesMap; + + @BeforeClass + public void init() throws Exception { + resourcesMap = ImmutableMap.of( + "hdfs-mirroring-template.xml", extensionStorePath + + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-template.xml", + "hdfs-mirroring-workflow.xml", extensionStorePath + + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml" + ); + } + + @Test + public void testGetExtensionResources() throws StoreAccessException { + String extensionName = new HdfsMirroringExtension().getName(); + Map<String, String> resources = store.getExtensionResources(extensionName); + + for (Map.Entry<String, String> entry : resources.entrySet()) { + String path = resourcesMap.get(entry.getKey()); + Assert.assertEquals(entry.getValue(), path); + } + } + + @Test + public void testGetExtensionLibPath() throws StoreAccessException { + String extensionName = new HdfsMirroringExtension().getName(); + String libPath = extensionStorePath + "/hdfs-mirroring/libs"; + Assert.assertEquals(store.getExtensionLibPath(extensionName), libPath); + } + +} + http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/resources/backup-cluster-0.1.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/backup-cluster-0.1.xml b/extensions/src/test/resources/backup-cluster-0.1.xml new file mode 100644 index 0000000..c3ba6b9 --- /dev/null +++ b/extensions/src/test/resources/backup-cluster-0.1.xml @@ -0,0 +1,44 @@ +<?xml version="1.0"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<cluster colo="gs" description="" name="backupCluster" xmlns="uri:falcon:cluster:0.1" + > + <interfaces> + <interface type="readonly" endpoint="hftp://localhost:50010" + version="0.20.2"/> + <interface type="write" endpoint="hdfs://localhost:8020" + version="0.20.2"/> + <interface type="execute" endpoint="localhost:8021" version="0.20.2"/> + <interface type="workflow" endpoint="http://localhost:11000/oozie/" + version="4.0"/> + <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" + version="5.1.6"/> + <interface type="registry" endpoint="Hcat" version="1"/> + </interfaces> + <locations> + <location name="staging" path="/projects/falcon/staging"/> + <location name="temp" path="/tmp"/> + <location name="working" path="/projects/falcon/working"/> + </locations> + <properties> + <property name="field1" value="value1"/> + <property name="field2" value="value2"/> + <property name="hive.metastore.client.socket.timeout" value="20"/> + </properties> +</cluster> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/resources/hdfs-mirroring-template.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/hdfs-mirroring-template.xml b/extensions/src/test/resources/hdfs-mirroring-template.xml new file mode 100644 index 0000000..6c35c5b --- /dev/null +++ b/extensions/src/test/resources/hdfs-mirroring-template.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="##job.name##" xmlns="uri:falcon:process:0.1"> + <clusters> + <!-- source --> + <cluster name="##job.cluster.name##"> + <validity end="##job.validity.end##" start="##job.validity.start##"/> + </cluster> + </clusters> + + <tags/> + + <parallel>1</parallel> + <!-- Dir replication needs to run only once to catch up --> + <order>LAST_ONLY</order> + <frequency>##job.frequency##</frequency> + <timezone>##job.timezone##</timezone> + + <properties> + <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/> + </properties> + + <workflow name="##job.workflow.name##" engine="##job.workflow.engine##" + path="##job.workflow.path##" lib="##job.workflow.lib.path##"/> + <retry policy="##job.retry.policy##" delay="##job.retry.delay##" attempts="3"/> + <notification type="##job.notification.type##" to="##job.notification.receivers##"/> + <ACL/> +</process> http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/resources/hive-mirroring-template.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/hive-mirroring-template.xml b/extensions/src/test/resources/hive-mirroring-template.xml new file mode 100644 index 0000000..9f28991 --- /dev/null +++ b/extensions/src/test/resources/hive-mirroring-template.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="##name##" xmlns="uri:falcon:process:0.1"> + <clusters> + <!-- source --> + <cluster name="##cluster.name##"> + <validity end="##cluster.validity.end##" start="##cluster.validity.start##"/> + </cluster> + </clusters> + + <tags/> + + <parallel>1</parallel> + <!-- Replication needs to run only once to catch up --> + <order>LAST_ONLY</order> + <frequency>##process.frequency##</frequency> + <timezone>UTC</timezone> + + <properties> + <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/> + </properties> + + <workflow name="##workflow.name##" engine="oozie" + path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/> + <retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/> + <notification type="##notification.type##" to="##notification.receivers##"/> + <ACL/> +</process> http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/extensions/src/test/resources/primary-cluster-0.1.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/primary-cluster-0.1.xml b/extensions/src/test/resources/primary-cluster-0.1.xml new file mode 100644 index 0000000..a9694c2 --- /dev/null +++ b/extensions/src/test/resources/primary-cluster-0.1.xml @@ -0,0 +1,44 @@ +<?xml version="1.0"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> + +<cluster colo="gs" description="" name="primaryCluster" xmlns="uri:falcon:cluster:0.1" + > + <interfaces> + <interface type="readonly" endpoint="hftp://localhost:50010" + version="0.20.2"/> + <interface type="write" endpoint="hdfs://localhost:8020" + version="0.20.2"/> + <interface type="execute" endpoint="localhost:8021" version="0.20.2"/> + <interface type="workflow" endpoint="http://localhost:11000/oozie/" + version="4.0"/> + <interface type="messaging" endpoint="tcp://localhost:61616?daemon=true" + version="5.1.6"/> + <interface type="registry" endpoint="Hcat" version="1"/> + </interfaces> + <locations> + <location name="staging" path="/projects/falcon/staging"/> + <location name="temp" path="/tmp"/> + <location name="working" path="/projects/falcon/working"/> + </locations> + <properties> + <property name="field1" value="value1"/> + <property name="field2" value="value2"/> + <property name="hive.metastore.client.socket.timeout" value="20"/> + </properties> +</cluster> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/oozie/pom.xml ---------------------------------------------------------------------- diff --git a/oozie/pom.xml b/oozie/pom.xml index c53d33c..04b3df6 100644 --- a/oozie/pom.xml +++ b/oozie/pom.xml @@ -67,6 +67,12 @@ <dependency> <groupId>org.apache.falcon</groupId> + <artifactId>falcon-extensions</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.falcon</groupId> <artifactId>falcon-common</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java index c15d6b9..815f5f7 100644 --- a/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java +++ b/oozie/src/main/java/org/apache/falcon/service/SharedLibraryHostingService.java @@ -29,9 +29,14 @@ import org.apache.falcon.entity.v0.cluster.ClusterLocationType; import org.apache.falcon.entity.v0.cluster.Interfacetype; import org.apache.falcon.hadoop.HadoopClientFactory; import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.fs.permission.FsPermission; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,16 +80,70 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener } }; - private void addLibsTo(Cluster cluster) throws FalconException { - FileSystem fs = null; + private void pushExtensionArtifactsToCluster(final Cluster cluster, + final FileSystem clusterFs) throws FalconException { + + ExtensionStore store = ExtensionStore.get(); + if (!store.isExtensionStoreInitialized()) { + LOG.info("Extension store not initialized by Extension service. Make sure Extension service is added in " + + "start up properties"); + return; + } + + Path extensionStorePath = store.getExtensionStorePath(); + LOG.info("extensionStorePath :{}", extensionStorePath); + FileSystem falconFileSystem = + HadoopClientFactory.get().createFalconFileSystem(extensionStorePath.toUri()); + String nameNode = StringUtils.removeEnd(falconFileSystem.getConf().get(HadoopClientFactory + .FS_DEFAULT_NAME_KEY), File.separator); + + + String clusterStorageUrl = StringUtils.removeEnd(ClusterHelper.getStorageUrl(cluster), File.separator); + + // If default fs for Falcon server is same as cluster fs abort copy + if (nameNode.equalsIgnoreCase(clusterStorageUrl)) { + LOG.info("clusterStorageUrl :{} same return", clusterStorageUrl); + return; + } + try { - LOG.info("Initializing FS: {} for cluster: {}", ClusterHelper.getStorageUrl(cluster), cluster.getName()); - fs = HadoopClientFactory.get().createFalconFileSystem(ClusterHelper.getConfiguration(cluster)); - fs.getStatus(); - } catch (Exception e) { - throw new FalconException("Failed to initialize FS for cluster : " + cluster.getName(), e); + RemoteIterator<LocatedFileStatus> fileStatusListIterator = + falconFileSystem.listFiles(extensionStorePath, true); + + while (fileStatusListIterator.hasNext()) { + LocatedFileStatus srcfileStatus = fileStatusListIterator.next(); + Path filePath = Path.getPathWithoutSchemeAndAuthority(srcfileStatus.getPath()); + + if (srcfileStatus.isDirectory()) { + if (!clusterFs.exists(filePath)) { + HadoopClientFactory.mkdirs(clusterFs, filePath, srcfileStatus.getPermission()); + } + } else { + if (clusterFs.exists(filePath)) { + FileStatus targetfstat = clusterFs.getFileStatus(filePath); + if (targetfstat.getLen() == srcfileStatus.getLen()) { + continue; + } + } + + Path parentPath = filePath.getParent(); + if (!clusterFs.exists(parentPath)) { + FsPermission dirPerm = falconFileSystem.getFileStatus(parentPath).getPermission(); + HadoopClientFactory.mkdirs(clusterFs, parentPath, dirPerm); + } + + FileUtil.copy(falconFileSystem, srcfileStatus, clusterFs, filePath, false, true, + falconFileSystem.getConf()); + FileUtil.chmod(clusterFs.makeQualified(filePath).toString(), + srcfileStatus.getPermission().toString()); + } + } + } catch (IOException | InterruptedException e) { + throw new FalconException("Failed to copy extension artifacts to cluster" + cluster.getName(), e); } + } + private void addLibsTo(Cluster cluster, FileSystem fs) throws FalconException { try { Path lib = new Path(ClusterHelper.getLocation(cluster, ClusterLocationType.WORKING).getPath(), "lib"); @@ -173,7 +232,8 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener return; } - addLibsTo(cluster); + addLibsTo(cluster, getFilesystem(cluster)); + pushExtensionArtifactsToCluster(cluster, getFilesystem(cluster)); } @Override @@ -192,7 +252,8 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener .equals(ClusterHelper.getInterface(newCluster, Interfacetype.WRITE).getEndpoint()) || !ClusterHelper.getInterface(oldCluster, Interfacetype.WORKFLOW).getEndpoint() .equals(ClusterHelper.getInterface(newCluster, Interfacetype.WORKFLOW).getEndpoint())) { - addLibsTo(newCluster); + addLibsTo(newCluster, getFilesystem(newCluster)); + pushExtensionArtifactsToCluster(newCluster, getFilesystem(newCluster)); } } @@ -204,4 +265,16 @@ public class SharedLibraryHostingService implements ConfigurationChangeListener LOG.error(e.getMessage(), e); } } + + private FileSystem getFilesystem(final Cluster cluster) throws FalconException { + FileSystem fs; + try { + LOG.info("Initializing FS: {} for cluster: {}", ClusterHelper.getStorageUrl(cluster), cluster.getName()); + fs = HadoopClientFactory.get().createFalconFileSystem(ClusterHelper.getConfiguration(cluster)); + fs.getStatus(); + return fs; + } catch (Exception e) { + throw new FalconException("Failed to initialize FS for cluster : " + cluster.getName(), e); + } + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index d768c6d..a62c030 100644 --- a/pom.xml +++ b/pom.xml @@ -301,6 +301,7 @@ <exclude>**/db1.properties</exclude> <exclude>**/db1.script</exclude> <exclude>**/credential_provider.jceks</exclude> + <exclude>**/*.json</exclude> </excludes> </configuration> <executions> @@ -429,6 +430,7 @@ <module>retention</module> <module>archival</module> <module>rerun</module> + <module>extensions</module> <module>prism</module> <module>unit</module> <module>lifecycle</module> http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/src/main/assemblies/distributed-package.xml ---------------------------------------------------------------------- diff --git a/src/main/assemblies/distributed-package.xml b/src/main/assemblies/distributed-package.xml index 2eff638..d58f40b 100644 --- a/src/main/assemblies/distributed-package.xml +++ b/src/main/assemblies/distributed-package.xml @@ -115,17 +115,72 @@ <fileMode>0644</fileMode> <directoryMode>0755</directoryMode> </fileSet> + <fileSet> + <directory>addons/extensions/hdfs-mirroring/src/main/META</directory> + <outputDirectory>extensions/hdfs-mirroring/META</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> <fileSet> - <directory>../addons/recipes/hdfs-replication/src/main/resources</directory> - <outputDirectory>data-mirroring/hdfs-replication</outputDirectory> - <fileMode>0644</fileMode> + <directory>./</directory> + <outputDirectory>extensions/hdfs-mirroring/libs/build</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <outputDirectory>extensions/hdfs-mirroring/libs/runtime</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>addons/extensions/hdfs-mirroring/src/main/resources</directory> + <outputDirectory>extensions/hdfs-mirroring/resources</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>addons/extensions/hive-mirroring/src/main/META</directory> + <outputDirectory>extensions/hive-mirroring/META</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> </fileSet> <fileSet> - <directory>../addons/recipes/hive-disaster-recovery/src/main/resources</directory> - <outputDirectory>data-mirroring/hive-disaster-recovery</outputDirectory> - <fileMode>0644</fileMode> + <directory>./</directory> + <fileMode>0755</fileMode> + <outputDirectory>extensions/hive-mirroring/libs/build</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <outputDirectory>extensions/hive-mirroring/libs/runtime</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>addons/extensions/hive-mirroring/src/main/resources</directory> + <outputDirectory>extensions/hive-mirroring/resources</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> </fileSet> <fileSet> @@ -181,6 +236,18 @@ <outputDirectory>oozie/conf</outputDirectory> <fileMode>0644</fileMode> </file> + + <file> + <source>../addons/extensions/hdfs-mirroring/README</source> + <outputDirectory>extensions/hdfs-mirroring</outputDirectory> + <fileMode>0755</fileMode> + </file> + + <file> + <source>../addons/extensions/hive-mirroring/README</source> + <outputDirectory>extensions/hive-mirroring</outputDirectory> + <fileMode>0755</fileMode> + </file> </files> </assembly> http://git-wip-us.apache.org/repos/asf/falcon/blob/95bf312f/src/main/assemblies/standalone-package.xml ---------------------------------------------------------------------- diff --git a/src/main/assemblies/standalone-package.xml b/src/main/assemblies/standalone-package.xml index bef19ce..eadd972 100644 --- a/src/main/assemblies/standalone-package.xml +++ b/src/main/assemblies/standalone-package.xml @@ -121,15 +121,71 @@ </fileSet> <fileSet> - <directory>../addons/recipes/hdfs-replication/src/main/resources</directory> - <outputDirectory>data-mirroring/hdfs-replication</outputDirectory> - <fileMode>0644</fileMode> + <directory>addons/extensions/hdfs-mirroring/src/main/META</directory> + <outputDirectory>extensions/hdfs-mirroring/META</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> </fileSet> <fileSet> - <directory>../addons/recipes/hive-disaster-recovery/src/main/resources</directory> - <outputDirectory>data-mirroring/hive-disaster-recovery</outputDirectory> - <fileMode>0644</fileMode> + <directory>./</directory> + <outputDirectory>extensions/hdfs-mirroring/libs/build</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <outputDirectory>extensions/hdfs-mirroring/libs/runtime</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>addons/extensions/hdfs-mirroring/src/main/resources</directory> + <outputDirectory>extensions/hdfs-mirroring/resources</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>addons/extensions/hive-mirroring/src/main/META</directory> + <outputDirectory>extensions/hive-mirroring/META</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <fileMode>0755</fileMode> + <outputDirectory>extensions/hive-mirroring/libs/build</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <outputDirectory>extensions/hive-mirroring/libs/runtime</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>addons/extensions/hive-mirroring/src/main/resources</directory> + <outputDirectory>extensions/hive-mirroring/resources</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> </fileSet> </fileSets> @@ -165,5 +221,17 @@ <destName>falcon.war</destName> <fileMode>0644</fileMode> </file> + + <file> + <source>../addons/extensions/hdfs-mirroring/README</source> + <outputDirectory>extensions/hdfs-mirroring</outputDirectory> + <fileMode>0755</fileMode> + </file> + + <file> + <source>../addons/extensions/hive-mirroring/README</source> + <outputDirectory>extensions/hive-mirroring</outputDirectory> + <fileMode>0755</fileMode> + </file> </files> </assembly>
