Repository: falcon Updated Branches: refs/heads/master b11e0570f -> cec771caa
FALCON-2183 Extension Builder changes to support new user extensions Author: sandeep <[email protected]> Reviewers: @pallavi-rao Closes #298 from sandeepSamudrala/FALCON-2183 and squashes the following commits: 8aacd75 [sandeep] FALCON-2183 Incorporated review comments f3d7268 [sandeep] FALCON-2183 Incorporated review comments 11e7b3f [sandeep] FALCON-2183 Extension Builder changes to support new user extensions 250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon 1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon c065566 [sandeep] reverting last line changes made 1a4dcd2 [sandeep] rebased and resolved the conflicts from master 271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay. a94d4fe [sandeep] rebasing from master 9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/cec771ca Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/cec771ca Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/cec771ca Branch: refs/heads/master Commit: cec771caad366f886a3806eac9e5752273ed6afe Parents: b11e057 Author: sandeep <[email protected]> Authored: Fri Nov 18 16:32:45 2016 +0530 Committer: Pallavi Rao <[email protected]> Committed: Fri Nov 18 16:32:45 2016 +0530 ---------------------------------------------------------------------- .../org/apache/falcon/extensions/Extension.java | 48 +++++++++++++++---- .../falcon/extensions/ExtensionBuilder.java | 29 +++++++++++- .../apache/falcon/extensions/ExtensionTest.java | 49 ++++++++++++++------ .../resource/extensions/ExtensionManager.java | 2 +- 4 files changed, 101 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/cec771ca/extensions/src/main/java/org/apache/falcon/extensions/Extension.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/Extension.java b/extensions/src/main/java/org/apache/falcon/extensions/Extension.java index 6c83fe8..3869718 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/Extension.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/Extension.java @@ -20,13 +20,18 @@ package org.apache.falcon.extensions; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; +import org.apache.falcon.Pair; import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.feed.Schema; import org.apache.falcon.extensions.store.ExtensionStore; import org.apache.falcon.extensions.util.ExtensionProcessBuilderUtils; +import org.apache.openjpa.util.UnsupportedException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Arrays; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Properties; @@ -60,26 +65,33 @@ public class Extension implements ExtensionBuilder { } @Override - public List<Entity> getEntities(final String extensionName, final Properties extensionProperties) + public List<Entity> getEntities(final String extensionName, final InputStream configStream) throws FalconException { if (StringUtils.isBlank(extensionName)) { throw new FalconException("Extension name cannot be null or empty"); } - validateProperties(extensionProperties); + Properties configProperties = new Properties(); + try { + configProperties.load(configStream); + } catch (IOException e) { + LOG.error("Error in reading the config stream"); + throw new FalconException("Error while reading the config stream", e); + } + validateProperties(configProperties); String name = extensionName.toLowerCase(); AbstractExtension extension = ExtensionFactory.getExtensionType(name); if (extension != null) { - extension.validate(extensionProperties); - Properties props = extension.getAdditionalProperties(extensionProperties); + extension.validate(configProperties); + Properties props = extension.getAdditionalProperties(configProperties); if (props != null && !props.isEmpty()) { - extensionProperties.putAll(props); + configProperties.putAll(props); } } ExtensionStore store = ExtensionService.getExtensionStore(); - String resourceName = extensionProperties.getProperty(ExtensionProperties.RESOURCE_NAME.getName()); + String resourceName = configProperties.getProperty(ExtensionProperties.RESOURCE_NAME.getName()); if (StringUtils.isBlank(resourceName)) { resourceName = name; } @@ -92,11 +104,29 @@ public class Extension implements ExtensionBuilder { /* Get Lib path */ String wfLibPath = store.getExtensionLibPath(name); Entity entity = ExtensionProcessBuilderUtils.createProcessFromTemplate(extensionTemplate, - name, extensionProperties, wfPath, wfLibPath); + name, configProperties, wfPath, wfLibPath); if (entity == null) { throw new FalconException("Entity created from the extension template cannot be null"); } LOG.info("Extension processing complete"); - return Arrays.asList(entity); + return Collections.singletonList(entity); + } + + @Override + public void validateExtensionConfig(String extensionName, InputStream extensionConfigStream) + throws FalconException { + Properties configProperties = new Properties(); + try { + configProperties.load(extensionConfigStream); + } catch (IOException e) { + LOG.error("Error in reading the config stream"); + throw new FalconException("Error while reading the config stream", e); + } + validateProperties(configProperties); + } + + @Override + public List<Pair<String, Schema>> getOutputSchemas(String extensionName) throws FalconException { + throw new UnsupportedException("Not yet Implemented"); } } http://git-wip-us.apache.org/repos/asf/falcon/blob/cec771ca/extensions/src/main/java/org/apache/falcon/extensions/ExtensionBuilder.java ---------------------------------------------------------------------- diff --git a/extensions/src/main/java/org/apache/falcon/extensions/ExtensionBuilder.java b/extensions/src/main/java/org/apache/falcon/extensions/ExtensionBuilder.java index bd52ed4..9f043d6 100644 --- a/extensions/src/main/java/org/apache/falcon/extensions/ExtensionBuilder.java +++ b/extensions/src/main/java/org/apache/falcon/extensions/ExtensionBuilder.java @@ -19,14 +19,39 @@ package org.apache.falcon.extensions; import org.apache.falcon.FalconException; +import org.apache.falcon.Pair; import org.apache.falcon.entity.v0.Entity; +import org.apache.falcon.entity.v0.feed.Schema; -import java.util.Properties; +import java.io.InputStream; import java.util.List; /** * Extension interface to be implemented by all trusted and custom extensions. */ public interface ExtensionBuilder { - List<Entity> getEntities(final String extensionName, final Properties extensionProperties) throws FalconException; + + /** + * @param extensionName extension name. + * @param extensionConfigStream stream comprising of the extension properties. + * @return List of the entities that are involved in the extension. + * @throws FalconException + */ + List<Entity> getEntities(final String extensionName, final InputStream extensionConfigStream) + throws FalconException; + + /** + * @param extensionName extension name. + * @param extensionConfigStream Properties supplied will be validated. + * @throws FalconException + */ + void validateExtensionConfig(final String extensionName, final InputStream extensionConfigStream) + throws FalconException; + + /** + * @param extensionName extension name. + * @return List of the feed names along with the schema that the extension has generated if any. + * @throws FalconException + */ + List<Pair<String, Schema>> getOutputSchemas(final String extensionName) throws FalconException; } http://git-wip-us.apache.org/repos/asf/falcon/blob/cec771ca/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java ---------------------------------------------------------------------- diff --git a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java index 3386a31..4763db8 100644 --- a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java +++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java @@ -49,7 +49,11 @@ import org.testng.annotations.Test; import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; import java.io.InputStream; +import java.io.OutputStream; import java.nio.file.Files; import java.util.List; import java.util.Properties; @@ -95,7 +99,7 @@ public class ExtensionTest extends AbstractTestExtensionStore { return properties; } - private static Properties getHdfsProperties() { + private static InputStream getHdfsConfigStream() throws IOException { Properties properties = getCommonProperties(); properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(), SOURCEDIR); @@ -106,10 +110,10 @@ public class ExtensionTest extends AbstractTestExtensionStore { properties.setProperty(HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName(), TARGET_CLUSTER); - return properties; + return getConfigStream(properties, "target/hdfsconfig.properties"); } - private static Properties getHdfsSnapshotExtensionProperties() { + private static InputStream getHdfsSnapshotExtensionConfigStream() throws IOException { Properties properties = getCommonProperties(); properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(), SOURCEDIR); @@ -143,7 +147,16 @@ public class ExtensionTest extends AbstractTestExtensionStore { properties.setProperty(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(), "false"); - return properties; + return getConfigStream(properties, "target/HdfsSnapshotMirror.properties"); + } + + private static InputStream getConfigStream(Properties properties, String pathName) throws IOException { + File file = new File(pathName); + file.delete(); + OutputStream outputStream = new FileOutputStream(pathName); + properties.store(outputStream, null); + outputStream.close(); + return new FileInputStream(pathName); } @BeforeClass @@ -169,10 +182,10 @@ public class ExtensionTest extends AbstractTestExtensionStore { } @Test - public void testGetExtensionEntitiesForHdfsMirroring() throws FalconException { + public void testGetExtensionEntitiesForHdfsMirroring() throws FalconException, IOException { ProcessEntityParser parser = (ProcessEntityParser) EntityParserFactory.getParser(EntityType.PROCESS); - List<Entity> entities = extension.getEntities(new HdfsMirroringExtension().getName(), getHdfsProperties()); + List<Entity> entities = extension.getEntities(new HdfsMirroringExtension().getName(), getHdfsConfigStream()); if (entities == null || entities.isEmpty()) { Assert.fail("Entities returned cannot be null or empty"); } @@ -219,11 +232,14 @@ public class ExtensionTest extends AbstractTestExtensionStore { @Test(expectedExceptions = FalconException.class, expectedExceptionsMessageRegExp = "Missing extension property: jobClusterName") - public void testGetExtensionEntitiesForHdfsMirroringMissingMandatoryProperties() throws FalconException { - Properties props = getHdfsProperties(); + public void testGetExtensionEntitiesForHdfsMirroringMissingMandatoryProperties() + throws FalconException, IOException { + Properties props = new Properties(); + props.load(getHdfsConfigStream()); props.remove(ExtensionProperties.CLUSTER_NAME.getName()); - extension.getEntities(new HdfsMirroringExtension().getName(), props); + extension.getEntities(new HdfsMirroringExtension().getName(), + getConfigStream(props, "target/HdfsMirroringMissingMandatory.properties")); } @Test @@ -234,7 +250,7 @@ public class ExtensionTest extends AbstractTestExtensionStore { miniDfs.allowSnapshot(new Path(TARGETDIR)); List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), - getHdfsSnapshotExtensionProperties()); + getHdfsSnapshotExtensionConfigStream()); if (entities == null || entities.isEmpty()) { Assert.fail("Entities returned cannot be null or empty"); } @@ -288,7 +304,7 @@ public class ExtensionTest extends AbstractTestExtensionStore { miniDfs.disallowSnapshot(new Path(SOURCEDIR)); List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), - getHdfsSnapshotExtensionProperties()); + getHdfsSnapshotExtensionConfigStream()); if (entities == null || entities.isEmpty()) { Assert.fail("Entities returned cannot be null or empty"); } @@ -296,10 +312,13 @@ public class ExtensionTest extends AbstractTestExtensionStore { @Test(expectedExceptions = FalconException.class, expectedExceptionsMessageRegExp = "Missing extension property: sourceCluster") - public void testGetExtensionEntitiesForHdfsSnapshotMirroringMissingProperties() throws FalconException { - Properties props = getHdfsSnapshotExtensionProperties(); + public void testGetExtensionEntitiesForHdfsSnapshotMirroringMissingProperties() + throws FalconException, IOException { + Properties props = new Properties(); + props.load(getHdfsSnapshotExtensionConfigStream()); props.remove(HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName()); - extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), props); + extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), + getConfigStream(props, "target/HdfsSnapshotMirroringMissing.propertes")); } @Test(dependsOnMethods = "testHdfsSnapshotMirroringNonSnapshotableDir", @@ -311,7 +330,7 @@ public class ExtensionTest extends AbstractTestExtensionStore { } List<Entity> entities = extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), - getHdfsSnapshotExtensionProperties()); + getHdfsSnapshotExtensionConfigStream()); if (entities == null || entities.isEmpty()) { Assert.fail("Entities returned cannot be null or empty"); } http://git-wip-us.apache.org/repos/asf/falcon/blob/cec771ca/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java index 5669b8f..7fd4de5 100644 --- a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java @@ -483,7 +483,7 @@ public class ExtensionManager extends AbstractSchedulableEntityManager { // get entities for extension job Properties properties = new Properties(); properties.load(request.getInputStream()); - List<Entity> entities = extension.getEntities(extensionName, properties); + List<Entity> entities = extension.getEntities(extensionName, request.getInputStream()); // add tags on extension name and job for (Entity entity : entities) {
