Repository: falcon Updated Branches: refs/heads/master 7c0481eac -> aba79aae2
http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java index 1457b06..29fcdb9 100644 --- a/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java +++ b/common/src/main/java/org/apache/falcon/retention/EvictionHelper.java @@ -40,11 +40,14 @@ public final class EvictionHelper { private EvictionHelper(){} public static Pair<Date, Date> getDateRange(String period) throws ELException { - Long duration = (Long) EVALUATOR.evaluate("${" + period + "}", - Long.class, RESOLVER, RESOLVER); + Long duration = evalExpressionToMilliSeconds(period); Date end = new Date(); Date start = new Date(end.getTime() - duration); return Pair.of(start, end); } + public static Long evalExpressionToMilliSeconds(String period) throws ELException { + return (Long) EVALUATOR.evaluate("${" + period + "}", Long.class, RESOLVER, RESOLVER); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/common/src/main/resources/startup.properties b/common/src/main/resources/startup.properties index b7bac73..7a850f8 100644 --- a/common/src/main/resources/startup.properties +++ b/common/src/main/resources/startup.properties @@ -73,7 +73,7 @@ *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory ##### List of shared libraries for Falcon workflows ##### -*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3 +*.shared.libs=activemq-all,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el ##### Workflow Job Execution Completion listeners ##### *.workflow.execution.listeners= http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java ---------------------------------------------------------------------- diff --git a/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java b/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java new file mode 100644 index 0000000..ce40068 --- /dev/null +++ b/common/src/test/java/org/apache/falcon/retention/EvictionHelperTest.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.falcon.retention; + +import org.testng.Assert; +import org.testng.annotations.Test; + +/** + * Tests for EvictionHelper. + */ +public class EvictionHelperTest { + @Test + public void testEvictionHelper() throws Exception { + Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("days(3)").longValue(), 259200000); + Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("days(1)").longValue(), 86400000); + Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("hours(5)").longValue(), 18000000); + Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("minutes(5)").longValue(), 300000); + Assert.assertEquals(EvictionHelper.evalExpressionToMilliSeconds("minutes(1)").longValue(), 60000); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/pom.xml ---------------------------------------------------------------------- diff --git a/extensions/pom.xml b/extensions/pom.xml index 6a0725a..eb2faea 100644 --- a/extensions/pom.xml +++ b/extensions/pom.xml @@ -43,6 +43,30 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <classifier>tests</classifier> + <scope>test</scope> + </dependency> </dependencies> </profile> </profiles> @@ -77,6 +101,11 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </dependency> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-test-util</artifactId> + <scope>test</scope> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java index 11b3725..24bbb87 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/AbstractExtension.java @@ -20,6 +20,7 @@ package org.apache.falcon.extensions; import org.apache.falcon.FalconException; import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension; +import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension; import org.apache.falcon.extensions.mirroring.hive.HiveMirroringExtension; import java.util.ArrayList; @@ -33,12 +34,14 @@ import java.util.Properties; public abstract class AbstractExtension { private static final List<String> TRUSTED_EXTENSIONS = Arrays.asList( new HdfsMirroringExtension().getName().toUpperCase(), + new HdfsSnapshotMirroringExtension().getName().toUpperCase(), new HiveMirroringExtension().getName().toUpperCase()); private static List<AbstractExtension> extensions = new ArrayList<>(); public static List<AbstractExtension> getExtensions() { if (extensions.isEmpty()) { extensions.add(new HdfsMirroringExtension()); + extensions.add(new HdfsSnapshotMirroringExtension()); extensions.add(new HiveMirroringExtension()); } return extensions; http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java new file mode 100644 index 0000000..f179896 --- /dev/null +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirrorProperties.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.extensions.mirroring.hdfsSnapshot; + +/** + * Hdfs Snapshot Extension properties. + */ +public enum HdfsSnapshotMirrorProperties { + SOURCE_CLUSTER("sourceCluster", "Snapshot replication source cluster", true), + SOURCE_NN("sourceNN", "Snapshot replication source cluster namenode", false), + SOURCE_EXEC_URL("sourceExecUrl", "Snapshot replication source execute endpoint", false), + SOURCE_NN_KERBEROS_PRINCIPAL("sourceNNKerberosPrincipal", + "Snapshot replication source kerberos principal", false), + + SOURCE_SNAPSHOT_DIR("sourceSnapshotDir", "Location of source snapshot path", true), + SOURCE_SNAPSHOT_RETENTION_POLICY("sourceSnapshotRetentionPolicy", "Retention policy for source snapshots", false), + SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT("sourceSnapshotRetentionAgeLimit", + "Delete source snapshots older than this age", true), + SOURCE_SNAPSHOT_RETENTION_NUMBER("sourceSnapshotRetentionNumber", + "Number of latest source snapshots to retain on source", true), + + TARGET_CLUSTER("targetCluster", "Snapshot replication target cluster", true), + TARGET_NN("targetNN", "Snapshot replication target cluster namenode", false), + TARGET_EXEC_URL("targetExecUrl", "Snapshot replication target execute endpoint", false), + TARGET_NN_KERBEROS_PRINCIPAL("targetNNKerberosPrincipal", + "Snapshot replication target kerberos principal", false), + + TARGET_SNAPSHOT_DIR("targetSnapshotDir", "Target Hive metastore uri", true), + TARGET_SNAPSHOT_RETENTION_POLICY("targetSnapshotRetentionPolicy", "Retention policy for target snapshots", false), + TARGET_SNAPSHOT_RETENTION_AGE_LIMIT("targetSnapshotRetentionAgeLimit", + "Delete target snapshots older than this age", true), + TARGET_SNAPSHOT_RETENTION_NUMBER("targetSnapshotRetentionNumber", + "Number of latest target snapshots to retain on source", true), + + DISTCP_MAX_MAPS("distcpMaxMaps", "Maximum number of maps used during distcp", false), + MAP_BANDWIDTH_IN_MB("distcpMapBandwidth", "Bandwidth in MB/s used by each mapper during replication", false), + + TDE_ENCRYPTION_ENABLED("tdeEncryptionEnabled", "Is TDE encryption enabled on source and target", false), + SNAPSHOT_JOB_NAME("snapshotJobName", "Name of snapshot based mirror job", false); + + + private final String name; + private final String description; + private final boolean isRequired; + + HdfsSnapshotMirrorProperties(String name, String description, boolean isRequired) { + this.name = name; + this.description = description; + this.isRequired = isRequired; + } + + public String getName() { + return this.name; + } + + public String getDescription() { + return description; + } + + public boolean isRequired() { + return isRequired; + } + + @Override + public String toString() { + return getName(); + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java new file mode 100644 index 0000000..09cce3b --- /dev/null +++ b/extensions/src/main/java/org/apache/falcon/extensions/mirroring/hdfsSnapshot/HdfsSnapshotMirroringExtension.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.extensions.mirroring.hdfsSnapshot; + +import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconException; +import org.apache.falcon.entity.ClusterHelper; +import org.apache.falcon.entity.v0.cluster.Cluster; +import org.apache.falcon.extensions.AbstractExtension; +import org.apache.falcon.extensions.ExtensionProperties; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.falcon.security.SecurityUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Properties; + +/** + * Hdfs snapshot mirroring extension. + */ +public class HdfsSnapshotMirroringExtension extends AbstractExtension { + private static final Logger LOG = LoggerFactory.getLogger(HdfsSnapshotMirroringExtension.class); + private static final String EXTENSION_NAME = "HDFS-SNAPSHOT-MIRRORING"; + private static final String DEFAULT_RETENTION_POLICY = "delete"; + public static final String EMPTY_KERBEROS_PRINCIPAL = "NA"; + + @Override + public String getName() { + return EXTENSION_NAME; + } + + @Override + public void validate(final Properties extensionProperties) throws FalconException { + for (HdfsSnapshotMirrorProperties option : HdfsSnapshotMirrorProperties.values()) { + if (extensionProperties.getProperty(option.getName()) == null && option.isRequired()) { + throw new FalconException("Missing extension property: " + option.getName()); + } + } + + Cluster sourceCluster = ClusterHelper.getCluster(extensionProperties.getProperty( + HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName())); + if (sourceCluster == null) { + throw new FalconException("SourceCluster entity " + + HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName() + " not found"); + } + Cluster targetCluster = ClusterHelper.getCluster(extensionProperties.getProperty( + HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName())); + if (targetCluster == null) { + throw new FalconException("TargetCluster entity " + + HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName() + " not found"); + } + + Configuration sourceConf = ClusterHelper.getConfiguration(sourceCluster); + Configuration targetConf = ClusterHelper.getConfiguration(targetCluster); + DistributedFileSystem sourceFileSystem = + HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf); + DistributedFileSystem targetFileSystem = + HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf); + + Path sourcePath = new Path(extensionProperties.getProperty( + HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName())); + Path targetPath = new Path(extensionProperties.getProperty( + HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName())); + + // check if source and target path's exist and are snapshot-able + try { + if (sourceFileSystem.exists(sourcePath)) { + if (!isDirSnapshotable(sourceFileSystem, sourcePath)) { + throw new FalconException(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName() + + " " + sourcePath.toString() + " does not allow snapshots."); + } + } else { + throw new FalconException(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName() + + " " + sourcePath.toString() + " does not exist."); + } + if (targetFileSystem.exists(targetPath)) { + if (!isDirSnapshotable(targetFileSystem, targetPath)) { + throw new FalconException(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName() + + " " + targetPath.toString() + " does not allow snapshots."); + } + } else { + throw new FalconException(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName() + + " " + targetPath.toString() + " does not exist."); + } + } catch (IOException e) { + throw new FalconException(e.getMessage(), e); + } + + + } + + private static boolean isDirSnapshotable(DistributedFileSystem hdfs, Path path) throws FalconException { + try { + LOG.debug("HDFS Snapshot extension validating if dir {} is snapshotable.", path.toString()); + SnapshottableDirectoryStatus[] snapshotableDirs = hdfs.getSnapshottableDirListing(); + if (snapshotableDirs != null && snapshotableDirs.length > 0) { + for (SnapshottableDirectoryStatus dir : snapshotableDirs) { + if (dir.getFullPath().toString().equals(path.toString())) { + return true; + } + } + } + return false; + } catch (IOException e) { + LOG.error("Unable to verify if dir {} is snapshot-able. {}", path.toString(), e.getMessage()); + throw new FalconException("Unable to verify if dir " + path.toString() + " is snapshot-able", e); + } + } + + @Override + public Properties getAdditionalProperties(final Properties extensionProperties) throws FalconException { + Properties additionalProperties = new Properties(); + + // Add default properties if not passed + String distcpMaxMaps = extensionProperties.getProperty(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName()); + if (StringUtils.isBlank(distcpMaxMaps)) { + additionalProperties.put(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), "1"); + } + + String distcpMapBandwidth = extensionProperties.getProperty( + HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName()); + if (StringUtils.isBlank(distcpMapBandwidth)) { + additionalProperties.put(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), "100"); + } + + String tdeEnabled = extensionProperties.getProperty( + HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName()); + if (StringUtils.isNotBlank(tdeEnabled) && Boolean.parseBoolean(tdeEnabled)) { + additionalProperties.put(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "true"); + } else { + additionalProperties.put(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "false"); + } + + // Add sourceCluster properties + Cluster sourceCluster = ClusterHelper.getCluster(extensionProperties.getProperty( + HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName())); + if (sourceCluster == null) { + LOG.error("Cluster entity {} not found", HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName()); + throw new FalconException("Cluster entity " + + HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName() + " not found"); + } + additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), + ClusterHelper.getStorageUrl(sourceCluster)); + additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName(), + ClusterHelper.getMREndPoint(sourceCluster)); + String sourceKerberosPrincipal = ClusterHelper.getPropertyValue(sourceCluster, SecurityUtil.NN_PRINCIPAL); + if (StringUtils.isBlank(sourceKerberosPrincipal)) { + sourceKerberosPrincipal = EMPTY_KERBEROS_PRINCIPAL; + } + additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName(), + sourceKerberosPrincipal); + + String sourceRetentionPolicy = extensionProperties.getProperty( + HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName()); + if (StringUtils.isBlank(sourceRetentionPolicy)) { + sourceRetentionPolicy = DEFAULT_RETENTION_POLICY; + } + validateRetentionPolicy(sourceRetentionPolicy); + additionalProperties.put(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName(), + sourceRetentionPolicy); + + // Add targetCluster properties + Cluster targetCluster = ClusterHelper.getCluster(extensionProperties.getProperty( + HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName())); + if (targetCluster == null) { + LOG.error("Cluster entity {} not found", HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName()); + throw new FalconException("Cluster entity " + + HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName() + " not found"); + } + additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_NN.getName(), + ClusterHelper.getStorageUrl(targetCluster)); + additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName(), + ClusterHelper.getMREndPoint(targetCluster)); + String targetKerberosPrincipal = ClusterHelper.getPropertyValue(targetCluster, SecurityUtil.NN_PRINCIPAL); + if (StringUtils.isBlank(targetKerberosPrincipal)) { + targetKerberosPrincipal = EMPTY_KERBEROS_PRINCIPAL; + } + additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName(), + targetKerberosPrincipal); + + String targetRetentionPolicy = extensionProperties.getProperty( + HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName()); + if (StringUtils.isBlank(targetRetentionPolicy)) { + targetRetentionPolicy = DEFAULT_RETENTION_POLICY; + } + validateRetentionPolicy(targetRetentionPolicy); + additionalProperties.put(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName(), + targetRetentionPolicy); + + // Add jobName and jobCluster properties. + String jobName = extensionProperties.getProperty(ExtensionProperties.JOB_NAME.getName()); + if (StringUtils.isBlank(jobName)) { + throw new FalconException("Property " + + ExtensionProperties.JOB_NAME.getName() + " cannot be null"); + } + additionalProperties.put(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName(), jobName); + + String jobClusterName = extensionProperties.getProperty(ExtensionProperties.CLUSTER_NAME.getName()); + Cluster jobCluster = ClusterHelper.getCluster(jobClusterName); + if (jobCluster == null) { + LOG.error("Cluster entity {} not found", ExtensionProperties.CLUSTER_NAME.getName()); + throw new FalconException("Cluster entity " + + ExtensionProperties.CLUSTER_NAME.getName() + " not found"); + } + return additionalProperties; + } + + public static void validateRetentionPolicy(String retentionPolicy) throws FalconException { + if (!retentionPolicy.equalsIgnoreCase("delete")) { + throw new FalconException("Retention policy \"" + retentionPolicy + "\" is invalid"); + } + } +} http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java index 92e9805..9e23894 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/util/ExtensionProcessBuilderUtils.java @@ -44,9 +44,9 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; +import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.TimeZone; /** http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java index ffd9336..b14d500 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java @@ -20,6 +20,7 @@ package org.apache.falcon.extensions; import junit.framework.Assert; import org.apache.falcon.FalconException; +import org.apache.falcon.cluster.util.MiniHdfsClusterUtil; import org.apache.falcon.entity.ClusterHelper; import org.apache.falcon.entity.EntityUtil; import org.apache.falcon.entity.parser.EntityParserFactory; @@ -34,12 +35,22 @@ import org.apache.falcon.entity.v0.process.PolicyType; import org.apache.falcon.entity.v0.process.Process; import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtension; import org.apache.falcon.extensions.mirroring.hdfs.HdfsMirroringExtensionProperties; +import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties; +import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirroringExtension; import org.apache.falcon.extensions.store.AbstractTestExtensionStore; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.io.ByteArrayInputStream; +import java.io.File; import java.io.InputStream; +import java.nio.file.Files; import java.util.List; import java.util.Properties; @@ -58,9 +69,18 @@ public class ExtensionTest extends AbstractTestExtensionStore { private static final String SOURCE_CLUSTER = "primaryCluster"; private static final String TARGETDIR = "/users/target/file1"; private static final String TARGET_CLUSTER = "backupCluster"; + private static final String NN_URI = "hdfs://localhost:54314"; + private static final String RETENTION_POLICY = "delete"; + private static final String RETENTION_AGE = "mins(5)"; + private static final String RETENTION_NUM = "7"; + private static final String TARGET_KERBEROS_PRINCIPAL = "nn/backup@REALM"; + private Extension extension; + private MiniDFSCluster miniDFSCluster; + private DistributedFileSystem miniDfs; + private File baseDir; - private static Properties getHdfsProperties() { + private static Properties getCommonProperties() { Properties properties = new Properties(); properties.setProperty(ExtensionProperties.JOB_NAME.getName(), JOB_NAME); @@ -72,6 +92,11 @@ public class ExtensionTest extends AbstractTestExtensionStore { VALIDITY_END); properties.setProperty(ExtensionProperties.FREQUENCY.getName(), FREQUENCY); + return properties; + } + + private static Properties getHdfsProperties() { + Properties properties = getCommonProperties(); properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(), SOURCEDIR); properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_CLUSTER.getName(), @@ -84,10 +109,52 @@ public class ExtensionTest extends AbstractTestExtensionStore { return properties; } + private static Properties getHdfsSnapshotExtensionProperties() { + Properties properties = getCommonProperties(); + properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(), + SOURCEDIR); + properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName(), + SOURCE_CLUSTER); + properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_POLICY.getName(), + RETENTION_POLICY); + properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_AGE_LIMIT.getName(), + RETENTION_AGE); + properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_RETENTION_NUMBER.getName(), + RETENTION_NUM); + properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_NN.getName(), + NN_URI); + + properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName(), + TARGETDIR); + properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_CLUSTER.getName(), + TARGET_CLUSTER); + properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_POLICY.getName(), + RETENTION_POLICY); + properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_AGE_LIMIT.getName(), + RETENTION_AGE); + properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_RETENTION_NUMBER.getName(), + RETENTION_NUM); + properties.setProperty(HdfsSnapshotMirrorProperties.TARGET_NN.getName(), + NN_URI); + properties.setProperty(HdfsSnapshotMirrorProperties.DISTCP_MAX_MAPS.getName(), + "5"); + properties.setProperty(HdfsSnapshotMirrorProperties.MAP_BANDWIDTH_IN_MB.getName(), + "100"); + properties.setProperty(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), + "false"); + + return properties; + } + @BeforeClass public void init() throws Exception { extension = new Extension(); + baseDir = Files.createTempDirectory("test_extensions_hdfs").toFile().getAbsoluteFile(); + miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.EXTENSION_TEST_PORT, baseDir); initClusters(); + miniDfs = miniDFSCluster.getFileSystem(); + miniDfs.mkdirs(new Path(SOURCEDIR), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); + miniDfs.mkdirs(new Path(TARGETDIR), new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL)); } private void initClusters() throws Exception { @@ -157,4 +224,100 @@ public class ExtensionTest extends AbstractTestExtensionStore { extension.getEntities(new HdfsMirroringExtension().getName(), props); } + + @Test + public void testGetExtensionEntitiesForHdfsSnapshotMirroring() throws Exception { + ProcessEntityParser parser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS); + + miniDfs.allowSnapshot(new Path(SOURCEDIR)); + miniDfs.allowSnapshot(new Path(TARGETDIR)); + + List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), + getHdfsSnapshotExtensionProperties()); + if (entities == null || entities.isEmpty()) { + Assert.fail("Entities returned cannot be null or empty"); + } + + Assert.assertEquals(1, entities.size()); + Entity entity = entities.get(0); + Assert.assertEquals(EntityType.PROCESS, entity.getEntityType()); + parser.parse(new ByteArrayInputStream(entity.toString().getBytes())); + + // Validate + Process processEntity = (Process) entity; + Assert.assertEquals(JOB_NAME, processEntity.getName()); + org.apache.falcon.entity.v0.process.Cluster jobCluster = processEntity.getClusters(). + getClusters().get(0); + Assert.assertEquals(JOB_CLUSTER_NAME, jobCluster.getName()); + Assert.assertEquals(VALIDITY_START, SchemaHelper.formatDateUTC(jobCluster.getValidity().getStart())); + Assert.assertEquals(VALIDITY_END, SchemaHelper.formatDateUTC(jobCluster.getValidity().getEnd())); + + Assert.assertEquals(FREQUENCY, processEntity.getFrequency().toString()); + Assert.assertEquals("UTC", processEntity.getTimezone().getID()); + + Assert.assertEquals(EngineType.OOZIE, processEntity.getWorkflow().getEngine()); + Assert.assertEquals(extensionStorePath + "/hdfs-snapshot-mirroring/libs", + processEntity.getWorkflow().getLib()); + Assert.assertEquals(extensionStorePath + + "/hdfs-snapshot-mirroring/resources/runtime/hdfs-snapshot-mirroring-workflow.xml", + processEntity.getWorkflow().getPath()); + + Properties props = EntityUtil.getEntityProperties(processEntity); + + Assert.assertEquals(SOURCEDIR, props.getProperty("sourceSnapshotDir")); + Assert.assertEquals(SOURCE_CLUSTER, props.getProperty("sourceCluster")); + Assert.assertEquals(TARGETDIR, props.getProperty("targetSnapshotDir")); + Assert.assertEquals(TARGET_CLUSTER, props.getProperty("targetCluster")); + Assert.assertEquals(JOB_NAME, props.getProperty("snapshotJobName")); + Assert.assertEquals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL, + props.getProperty("sourceNNKerberosPrincipal")); + Assert.assertEquals(TARGET_KERBEROS_PRINCIPAL, props.getProperty("targetNNKerberosPrincipal")); + + //retry + Assert.assertEquals(3, processEntity.getRetry().getAttempts()); + Assert.assertEquals(PolicyType.PERIODIC, processEntity.getRetry().getPolicy()); + Assert.assertEquals("minutes(30)", processEntity.getRetry().getDelay().toString()); + } + + + @Test(dependsOnMethods = "testGetExtensionEntitiesForHdfsSnapshotMirroring", + expectedExceptions = FalconException.class, + expectedExceptionsMessageRegExp = "sourceSnapshotDir /users/source/file1 does not allow snapshots.") + public void testHdfsSnapshotMirroringNonSnapshotableDir() throws Exception { + miniDfs.disallowSnapshot(new Path(SOURCEDIR)); + + List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), + getHdfsSnapshotExtensionProperties()); + if (entities == null || entities.isEmpty()) { + Assert.fail("Entities returned cannot be null or empty"); + } + } + + @Test(expectedExceptions = FalconException.class, + expectedExceptionsMessageRegExp = "Missing extension property: sourceCluster") + public void testGetExtensionEntitiesForHdfsSnapshotMirroringMissingProperties() throws FalconException { + Properties props = getHdfsSnapshotExtensionProperties(); + props.remove(HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName()); + extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), props); + } + + @Test(dependsOnMethods = "testHdfsSnapshotMirroringNonSnapshotableDir", + expectedExceptions = FalconException.class, + expectedExceptionsMessageRegExp = "sourceSnapshotDir /users/source/file1 does not exist.") + public void testHdfsSnapshotMirroringNonExistingDir() throws Exception { + if (miniDfs.exists(new Path(SOURCEDIR))) { + miniDfs.delete(new Path(SOURCEDIR), true); + } + + List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), + getHdfsSnapshotExtensionProperties()); + if (entities == null || entities.isEmpty()) { + Assert.fail("Entities returned cannot be null or empty"); + } + } + + @AfterClass + public void cleanup() throws Exception { + MiniHdfsClusterUtil.cleanupDfs(miniDFSCluster, baseDir); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java index 3462321..9dbacde 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java @@ -39,7 +39,11 @@ public class ExtensionStoreTest extends AbstractTestExtensionStore { "hdfs-mirroring-template.xml", extensionStorePath + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-template.xml", "hdfs-mirroring-workflow.xml", extensionStorePath - + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml" + + "/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml", + "hdfs-snapshot-mirroring-template.xml", extensionStorePath + + "/hdfs-mirroring/resources/runtime/hdfs-snapshot-mirroring-template.xml", + "hdfs-snapshot-mirroring-workflow.xml", extensionStorePath + + "/hdfs-mirroring/resources/runtime/hdfs-snapshot-mirroring-workflow.xml" ); } http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/resources/backup-cluster-0.1.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/backup-cluster-0.1.xml b/extensions/src/test/resources/backup-cluster-0.1.xml index c3ba6b9..27661be 100644 --- a/extensions/src/test/resources/backup-cluster-0.1.xml +++ b/extensions/src/test/resources/backup-cluster-0.1.xml @@ -22,7 +22,7 @@ <interfaces> <interface type="readonly" endpoint="hftp://localhost:50010" version="0.20.2"/> - <interface type="write" endpoint="hdfs://localhost:8020" + <interface type="write" endpoint="hdfs://localhost:54134" version="0.20.2"/> <interface type="execute" endpoint="localhost:8021" version="0.20.2"/> <interface type="workflow" endpoint="http://localhost:11000/oozie/" @@ -40,5 +40,6 @@ <property name="field1" value="value1"/> <property name="field2" value="value2"/> <property name="hive.metastore.client.socket.timeout" value="20"/> + <property name="dfs.namenode.kerberos.principal" value="nn/backup@REALM"/> </properties> </cluster> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml b/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml new file mode 100644 index 0000000..29131da --- /dev/null +++ b/extensions/src/test/resources/hdfs-snapshot-mirroring-template.xml @@ -0,0 +1,45 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + --> + +<process name="##jobName##" xmlns="uri:falcon:process:0.1"> + <clusters> + <!-- source --> + <cluster name="##jobClusterName##"> + <validity end="##jobValidityEnd##" start="##jobValidityStart##"/> + </cluster> + </clusters> + + <tags/> + + <parallel>1</parallel> + <!-- Replication needs to run only once to catch up --> + <order>LAST_ONLY</order> + <frequency>##jobFrequency##</frequency> + <timezone>##jobTimezone##</timezone> + + <properties> + <property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/> + </properties> + + <workflow name="##jobWorkflowName##" engine="##jobWorkflowEngine##" + path="##jobWorkflowPath##" lib="##jobWorkflowLibPath##"/> + <retry policy="##jobRetryPolicy##" delay="##jobRetryDelay##" attempts="3"/> + <notification type="##jobNotificationType##" to="##jobNotificationReceivers##"/> + <ACL/> +</process> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/extensions/src/test/resources/primary-cluster-0.1.xml ---------------------------------------------------------------------- diff --git a/extensions/src/test/resources/primary-cluster-0.1.xml b/extensions/src/test/resources/primary-cluster-0.1.xml index a9694c2..f42924c 100644 --- a/extensions/src/test/resources/primary-cluster-0.1.xml +++ b/extensions/src/test/resources/primary-cluster-0.1.xml @@ -22,7 +22,7 @@ <interfaces> <interface type="readonly" endpoint="hftp://localhost:50010" version="0.20.2"/> - <interface type="write" endpoint="hdfs://localhost:8020" + <interface type="write" endpoint="hdfs://localhost:54134" version="0.20.2"/> <interface type="execute" endpoint="localhost:8021" version="0.20.2"/> <interface type="workflow" endpoint="http://localhost:11000/oozie/" http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f777dc9..8f4561c 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ <activeByDefault>true</activeByDefault> </activation> <properties> - <hadoop.version>2.6.2</hadoop.version> + <hadoop.version>2.7.1</hadoop.version> </properties> <dependencyManagement> <dependencies> @@ -403,6 +403,41 @@ <module>addons/hivedr</module> </modules> </profile> + + <profile> + <id>hdfs-snapshot-mirroring</id> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-enforcer-plugin</artifactId> + <version>1.3.1</version> + <executions> + <execution> + <id>enforce-property</id> + <goals> + <goal>enforce</goal> + </goals> + <configuration> + <rules> + <requireProperty> + <property>hadoop.version</property> + <regex>^(2.7.*)</regex> + <regexMessage>HDFS Snapshot replication only works with hadoop version >= 2.7.0</regexMessage> + </requireProperty> + </rules> + <fail>true</fail> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + <modules> + <module>addons/hdfs-snapshot-mirroring</module> + </modules> + </profile> + <profile> <id>adf</id> <modules> @@ -627,6 +662,12 @@ </dependency> <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-extensions</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> <groupId>javax.xml.bind</groupId> <artifactId>jaxb-api</artifactId> <version>2.1</version> http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/scheduler/src/test/resources/startup.properties ---------------------------------------------------------------------- diff --git a/scheduler/src/test/resources/startup.properties b/scheduler/src/test/resources/startup.properties index 6216b70..46031e3 100644 --- a/scheduler/src/test/resources/startup.properties +++ b/scheduler/src/test/resources/startup.properties @@ -84,7 +84,7 @@ debug.libext.process.paths=${falcon.libext} ##### List of shared libraries for Falcon workflows ##### -*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3 +*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el ######### Authentication Properties ######### http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/conf/startup.properties ---------------------------------------------------------------------- diff --git a/src/conf/startup.properties b/src/conf/startup.properties index 3601e22..2f8f514 100644 --- a/src/conf/startup.properties +++ b/src/conf/startup.properties @@ -102,7 +102,7 @@ prism.configstore.listeners=org.apache.falcon.entity.v0.EntityGraph,\ *.broker.impl.class=org.apache.activemq.ActiveMQConnectionFactory ##### List of shared libraries for Falcon workflows ##### -*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3 +*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el ##### Workflow Job Execution Completion listeners ##### *.workflow.execution.listeners=org.apache.falcon.handler.SLAMonitoringHandler http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/main/assemblies/assembly-standalone.xml ---------------------------------------------------------------------- diff --git a/src/main/assemblies/assembly-standalone.xml b/src/main/assemblies/assembly-standalone.xml index cc1486a..d3111b7 100644 --- a/src/main/assemblies/assembly-standalone.xml +++ b/src/main/assemblies/assembly-standalone.xml @@ -172,6 +172,40 @@ </fileSet> <fileSet> + <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/META</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/META</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/build</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/runtime</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/resources</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/resources</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> <directory>addons/extensions/hive-mirroring/src/main/META</directory> <outputDirectory>extensions/hive-mirroring/META</outputDirectory> <fileMode>0755</fileMode> @@ -247,6 +281,12 @@ </file> <file> + <source>addons/extensions/hdfs-snapshot-mirroring/README</source> + <outputDirectory>extensions/hdfs-snapshot-mirroring</outputDirectory> + <fileMode>0755</fileMode> + </file> + + <file> <source>addons/extensions/hive-mirroring/README</source> <outputDirectory>extensions/hive-mirroring</outputDirectory> <fileMode>0755</fileMode> http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/main/assemblies/distributed-package.xml ---------------------------------------------------------------------- diff --git a/src/main/assemblies/distributed-package.xml b/src/main/assemblies/distributed-package.xml index 502018d..eb45c6f 100644 --- a/src/main/assemblies/distributed-package.xml +++ b/src/main/assemblies/distributed-package.xml @@ -150,6 +150,40 @@ </fileSet> <fileSet> + <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/META</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/META</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/build</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/runtime</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/resources</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/resources</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> <directory>addons/extensions/hive-mirroring/src/main/META</directory> <outputDirectory>extensions/hive-mirroring/META</outputDirectory> <fileMode>0755</fileMode> @@ -250,6 +284,12 @@ </file> <file> + <source>../addons/extensions/hdfs-snapshot-mirroring/README</source> + <outputDirectory>extensions/hdfs-snapshot-mirroring</outputDirectory> + <fileMode>0755</fileMode> + </file> + + <file> <source>../addons/extensions/hive-mirroring/README</source> <outputDirectory>extensions/hive-mirroring</outputDirectory> <fileMode>0755</fileMode> http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/src/main/assemblies/standalone-package.xml ---------------------------------------------------------------------- diff --git a/src/main/assemblies/standalone-package.xml b/src/main/assemblies/standalone-package.xml index eac2e11..0b5c69a 100644 --- a/src/main/assemblies/standalone-package.xml +++ b/src/main/assemblies/standalone-package.xml @@ -155,6 +155,40 @@ </fileSet> <fileSet> + <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/META</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/META</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/build</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>./</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/libs/runtime</outputDirectory> + <excludes> + <exclude>*/**</exclude> + </excludes> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> + <directory>addons/extensions/hdfs-snapshot-mirroring/src/main/resources</directory> + <outputDirectory>extensions/hdfs-snapshot-mirroring/resources</outputDirectory> + <fileMode>0755</fileMode> + <directoryMode>0755</directoryMode> + </fileSet> + + <fileSet> <directory>addons/extensions/hive-mirroring/src/main/META</directory> <outputDirectory>extensions/hive-mirroring/META</outputDirectory> <fileMode>0755</fileMode> @@ -235,6 +269,12 @@ </file> <file> + <source>../addons/extensions/hdfs-snapshot-mirroring/README</source> + <outputDirectory>extensions/hdfs-snapshot-mirroring</outputDirectory> + <fileMode>0755</fileMode> + </file> + + <file> <source>../addons/extensions/hive-mirroring/README</source> <outputDirectory>extensions/hive-mirroring</outputDirectory> <fileMode>0755</fileMode> http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/test-util/pom.xml ---------------------------------------------------------------------- diff --git a/test-util/pom.xml b/test-util/pom.xml index 5b4a8c8..9f60119 100644 --- a/test-util/pom.xml +++ b/test-util/pom.xml @@ -45,8 +45,14 @@ <dependency> <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-auth</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-hdfs</artifactId> <classifier>tests</classifier> + <scope>compile</scope> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java ---------------------------------------------------------------------- diff --git a/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java b/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java new file mode 100644 index 0000000..e1aee2e --- /dev/null +++ b/test-util/src/main/java/org/apache/falcon/cluster/util/MiniHdfsClusterUtil.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.cluster.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.MiniDFSCluster; + +import java.io.File; + +/** + * Create a local MiniDFS cluster for testing snapshots et al. + */ +public final class MiniHdfsClusterUtil { + + private MiniHdfsClusterUtil() {} + + public static final int EXTENSION_TEST_PORT = 54134; + public static final int SNAPSHOT_EVICTION_TEST_PORT = 54135; + public static final int SNAPSHOT_REPL_TEST_PORT = 54136; + + + public static MiniDFSCluster initMiniDfs(int port, File baseDir) throws Exception { + Configuration conf = new Configuration(); + conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(conf); + builder.nameNodePort(port); + return builder.build(); + } + + public static void cleanupDfs(MiniDFSCluster miniDFSCluster, File baseDir) throws Exception { + miniDFSCluster.shutdown(); + FileUtil.fullyDelete(baseDir); + } + +} http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/unit/pom.xml ---------------------------------------------------------------------- diff --git a/unit/pom.xml b/unit/pom.xml index f1ef463..b233acf 100644 --- a/unit/pom.xml +++ b/unit/pom.xml @@ -29,19 +29,31 @@ <artifactId>falcon-unit</artifactId> - <dependencies> + <profiles> + <profile> + <id>hadoop-2</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <properties> + <hadoop.version>2.7.1</hadoop.version> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </dependency> + </dependencies> + </profile> + </profiles> + <dependencies> <dependency> <groupId>org.apache.falcon</groupId> <artifactId>falcon-common</artifactId> </dependency> <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> - </dependency> - - <dependency> <groupId>org.apache.oozie</groupId> <artifactId>oozie-core</artifactId> <exclusions> http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java ---------------------------------------------------------------------- diff --git a/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java index 3070689..595f75c 100644 --- a/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java +++ b/unit/src/main/java/org/apache/falcon/unit/LocalFalconRPCClientFactory.java @@ -17,10 +17,6 @@ */ package org.apache.falcon.unit; -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.ApplicationClientProtocol; import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest; @@ -45,6 +41,8 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetLabelsToNodesResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest; @@ -71,6 +69,10 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.factories.RpcClientFactory; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; + /** * A Dummy implementation of RpcClientFactory that does not do RPC. * This is required as OozieClient tries to connect to RM via RPC to kill jobs which fails in local mode. @@ -179,6 +181,12 @@ public final class LocalFalconRPCClientFactory implements RpcClientFactory { } @Override + public GetLabelsToNodesResponse getLabelsToNodes(GetLabelsToNodesRequest getLabelsToNodesRequest) + throws YarnException, IOException { + return null; + } + + @Override public GetClusterNodeLabelsResponse getClusterNodeLabels(GetClusterNodeLabelsRequest getClusterNodeLabelsRequest) throws YarnException, IOException { return null; http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/unit/src/main/resources/startup.properties ---------------------------------------------------------------------- diff --git a/unit/src/main/resources/startup.properties b/unit/src/main/resources/startup.properties index 4dfea31..0e404cc 100644 --- a/unit/src/main/resources/startup.properties +++ b/unit/src/main/resources/startup.properties @@ -79,7 +79,7 @@ debug.libext.process.paths=${falcon.libext} ##### List of shared libraries for Falcon workflows ##### -*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3 +*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el ######### Authentication Properties ######### http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/webapp/pom.xml ---------------------------------------------------------------------- diff --git a/webapp/pom.xml b/webapp/pom.xml index dad0581..3582be1 100644 --- a/webapp/pom.xml +++ b/webapp/pom.xml @@ -88,6 +88,16 @@ </dependencies> </profile> <profile> + <id>hdfs-snapshot-mirroring</id> + <dependencies> + <dependency> + <groupId>org.apache.falcon</groupId> + <artifactId>falcon-hdfs-snapshot-mirroring</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </profile> + <profile> <id>adf</id> <dependencies> <dependency> http://git-wip-us.apache.org/repos/asf/falcon/blob/aba79aae/webapp/src/test/resources/startup.properties ---------------------------------------------------------------------- diff --git a/webapp/src/test/resources/startup.properties b/webapp/src/test/resources/startup.properties index 3544f0a..58018f1 100644 --- a/webapp/src/test/resources/startup.properties +++ b/webapp/src/test/resources/startup.properties @@ -87,7 +87,7 @@ debug.libext.process.paths=${falcon.libext} ##### List of shared libraries for Falcon workflows ##### -*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3 +*.shared.libs=activemq-all,ant,geronimo-j2ee-management,jms,json-simple,oozie-client,spring-jms,commons-lang3,commons-el ######### Authentication Properties #########
