Repository: falcon
Updated Branches:
  refs/heads/master b11e0570f -> cec771caa


FALCON-2183 Extension Builder changes to support new user extensions

Author: sandeep <[email protected]>

Reviewers: @pallavi-rao

Closes #298 from sandeepSamudrala/FALCON-2183 and squashes the following 
commits:

8aacd75 [sandeep] FALCON-2183 Incorporated review comments
f3d7268 [sandeep] FALCON-2183 Incorporated review comments
11e7b3f [sandeep] FALCON-2183 Extension Builder changes to support new user 
extensions
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next 
instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes


Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/cec771ca
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/cec771ca
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/cec771ca

Branch: refs/heads/master
Commit: cec771caad366f886a3806eac9e5752273ed6afe
Parents: b11e057
Author: sandeep <[email protected]>
Authored: Fri Nov 18 16:32:45 2016 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Fri Nov 18 16:32:45 2016 +0530

----------------------------------------------------------------------
 .../org/apache/falcon/extensions/Extension.java | 48 +++++++++++++++----
 .../falcon/extensions/ExtensionBuilder.java     | 29 +++++++++++-
 .../apache/falcon/extensions/ExtensionTest.java | 49 ++++++++++++++------
 .../resource/extensions/ExtensionManager.java   |  2 +-
 4 files changed, 101 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/falcon/blob/cec771ca/extensions/src/main/java/org/apache/falcon/extensions/Extension.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/Extension.java 
b/extensions/src/main/java/org/apache/falcon/extensions/Extension.java
index 6c83fe8..3869718 100644
--- a/extensions/src/main/java/org/apache/falcon/extensions/Extension.java
+++ b/extensions/src/main/java/org/apache/falcon/extensions/Extension.java
@@ -20,13 +20,18 @@ package org.apache.falcon.extensions;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.feed.Schema;
 import org.apache.falcon.extensions.store.ExtensionStore;
 import org.apache.falcon.extensions.util.ExtensionProcessBuilderUtils;
+import org.apache.openjpa.util.UnsupportedException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -60,26 +65,33 @@ public class Extension implements ExtensionBuilder {
     }
 
     @Override
-    public List<Entity> getEntities(final String extensionName, final 
Properties extensionProperties)
+    public List<Entity> getEntities(final String extensionName, final 
InputStream configStream)
         throws FalconException {
         if (StringUtils.isBlank(extensionName)) {
             throw new FalconException("Extension name cannot be null or 
empty");
         }
-        validateProperties(extensionProperties);
+        Properties configProperties = new Properties();
+        try {
+            configProperties.load(configStream);
+        } catch (IOException e) {
+            LOG.error("Error in reading the config stream");
+            throw new FalconException("Error while reading the config stream", 
e);
+        }
+        validateProperties(configProperties);
 
         String name = extensionName.toLowerCase();
         AbstractExtension extension = ExtensionFactory.getExtensionType(name);
         if (extension != null) {
-            extension.validate(extensionProperties);
-            Properties props = 
extension.getAdditionalProperties(extensionProperties);
+            extension.validate(configProperties);
+            Properties props = 
extension.getAdditionalProperties(configProperties);
             if (props != null && !props.isEmpty()) {
-                extensionProperties.putAll(props);
+                configProperties.putAll(props);
             }
         }
 
         ExtensionStore store = ExtensionService.getExtensionStore();
 
-        String resourceName = 
extensionProperties.getProperty(ExtensionProperties.RESOURCE_NAME.getName());
+        String resourceName = 
configProperties.getProperty(ExtensionProperties.RESOURCE_NAME.getName());
         if (StringUtils.isBlank(resourceName)) {
             resourceName = name;
         }
@@ -92,11 +104,29 @@ public class Extension implements ExtensionBuilder {
         /* Get Lib path */
         String wfLibPath = store.getExtensionLibPath(name);
         Entity entity = 
ExtensionProcessBuilderUtils.createProcessFromTemplate(extensionTemplate,
-                name, extensionProperties, wfPath, wfLibPath);
+                name, configProperties, wfPath, wfLibPath);
         if (entity == null) {
             throw new FalconException("Entity created from the extension 
template cannot be null");
         }
         LOG.info("Extension processing complete");
-        return Arrays.asList(entity);
+        return Collections.singletonList(entity);
+    }
+
+    @Override
+    public void validateExtensionConfig(String extensionName, InputStream 
extensionConfigStream)
+        throws FalconException {
+        Properties configProperties = new Properties();
+        try {
+            configProperties.load(extensionConfigStream);
+        } catch (IOException e) {
+            LOG.error("Error in reading the config stream");
+            throw new FalconException("Error while reading the config stream", 
e);
+        }
+        validateProperties(configProperties);
+    }
+
+    @Override
+    public List<Pair<String, Schema>> getOutputSchemas(String extensionName) 
throws FalconException {
+        throw new UnsupportedException("Not yet Implemented");
     }
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/cec771ca/extensions/src/main/java/org/apache/falcon/extensions/ExtensionBuilder.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/main/java/org/apache/falcon/extensions/ExtensionBuilder.java 
b/extensions/src/main/java/org/apache/falcon/extensions/ExtensionBuilder.java
index bd52ed4..9f043d6 100644
--- 
a/extensions/src/main/java/org/apache/falcon/extensions/ExtensionBuilder.java
+++ 
b/extensions/src/main/java/org/apache/falcon/extensions/ExtensionBuilder.java
@@ -19,14 +19,39 @@
 package org.apache.falcon.extensions;
 
 import org.apache.falcon.FalconException;
+import org.apache.falcon.Pair;
 import org.apache.falcon.entity.v0.Entity;
+import org.apache.falcon.entity.v0.feed.Schema;
 
-import java.util.Properties;
+import java.io.InputStream;
 import java.util.List;
 
 /**
  * Extension interface to be implemented by all trusted and custom extensions.
  */
 public interface ExtensionBuilder {
-    List<Entity> getEntities(final String extensionName, final Properties 
extensionProperties) throws FalconException;
+
+    /**
+     * @param extensionName extension name.
+     * @param extensionConfigStream stream comprising of the extension 
properties.
+     * @return List of the entities that are involved in the extension.
+     * @throws FalconException
+     */
+    List<Entity> getEntities(final String extensionName, final InputStream 
extensionConfigStream)
+        throws FalconException;
+
+    /**
+     * @param extensionName extension name.
+     * @param extensionConfigStream Properties supplied will be validated.
+     * @throws FalconException
+     */
+    void validateExtensionConfig(final String extensionName, final InputStream 
extensionConfigStream)
+        throws FalconException;
+
+    /**
+     * @param extensionName extension name.
+     * @return List of the feed names along with the schema that the extension 
has generated if any.
+     * @throws FalconException
+     */
+    List<Pair<String, Schema>> getOutputSchemas(final String extensionName) 
throws FalconException;
 }

http://git-wip-us.apache.org/repos/asf/falcon/blob/cec771ca/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
----------------------------------------------------------------------
diff --git 
a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java 
b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
index 3386a31..4763db8 100644
--- a/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
+++ b/extensions/src/test/java/org/apache/falcon/extensions/ExtensionTest.java
@@ -49,7 +49,11 @@ import org.testng.annotations.Test;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
 import java.nio.file.Files;
 import java.util.List;
 import java.util.Properties;
@@ -95,7 +99,7 @@ public class ExtensionTest extends AbstractTestExtensionStore 
{
         return properties;
     }
 
-    private static Properties getHdfsProperties() {
+    private static InputStream getHdfsConfigStream() throws IOException {
         Properties properties = getCommonProperties();
         
properties.setProperty(HdfsMirroringExtensionProperties.SOURCE_DIR.getName(),
                 SOURCEDIR);
@@ -106,10 +110,10 @@ public class ExtensionTest extends 
AbstractTestExtensionStore {
         
properties.setProperty(HdfsMirroringExtensionProperties.TARGET_CLUSTER.getName(),
                 TARGET_CLUSTER);
 
-        return properties;
+        return getConfigStream(properties, "target/hdfsconfig.properties");
     }
 
-    private static Properties getHdfsSnapshotExtensionProperties() {
+    private static InputStream getHdfsSnapshotExtensionConfigStream() throws 
IOException {
         Properties properties = getCommonProperties();
         
properties.setProperty(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName(),
                 SOURCEDIR);
@@ -143,7 +147,16 @@ public class ExtensionTest extends 
AbstractTestExtensionStore {
         
properties.setProperty(HdfsSnapshotMirrorProperties.TDE_ENCRYPTION_ENABLED.getName(),
                 "false");
 
-        return properties;
+        return getConfigStream(properties, 
"target/HdfsSnapshotMirror.properties");
+    }
+
+    private static InputStream getConfigStream(Properties properties, String 
pathName) throws IOException {
+        File file = new File(pathName);
+        file.delete();
+        OutputStream outputStream = new FileOutputStream(pathName);
+        properties.store(outputStream, null);
+        outputStream.close();
+        return new FileInputStream(pathName);
     }
 
     @BeforeClass
@@ -169,10 +182,10 @@ public class ExtensionTest extends 
AbstractTestExtensionStore {
     }
 
     @Test
-    public void testGetExtensionEntitiesForHdfsMirroring() throws 
FalconException {
+    public void testGetExtensionEntitiesForHdfsMirroring() throws 
FalconException, IOException {
         ProcessEntityParser parser = (ProcessEntityParser) 
EntityParserFactory.getParser(EntityType.PROCESS);
 
-        List<Entity> entities = extension.getEntities(new 
HdfsMirroringExtension().getName(), getHdfsProperties());
+        List<Entity> entities = extension.getEntities(new 
HdfsMirroringExtension().getName(), getHdfsConfigStream());
         if (entities == null || entities.isEmpty()) {
             Assert.fail("Entities returned cannot be null or empty");
         }
@@ -219,11 +232,14 @@ public class ExtensionTest extends 
AbstractTestExtensionStore {
 
     @Test(expectedExceptions = FalconException.class,
             expectedExceptionsMessageRegExp = "Missing extension property: 
jobClusterName")
-    public void 
testGetExtensionEntitiesForHdfsMirroringMissingMandatoryProperties() throws 
FalconException {
-        Properties props = getHdfsProperties();
+    public void 
testGetExtensionEntitiesForHdfsMirroringMissingMandatoryProperties()
+        throws FalconException, IOException {
+        Properties props = new Properties();
+        props.load(getHdfsConfigStream());
         props.remove(ExtensionProperties.CLUSTER_NAME.getName());
 
-        extension.getEntities(new HdfsMirroringExtension().getName(), props);
+        extension.getEntities(new HdfsMirroringExtension().getName(),
+                getConfigStream(props, 
"target/HdfsMirroringMissingMandatory.properties"));
     }
 
     @Test
@@ -234,7 +250,7 @@ public class ExtensionTest extends 
AbstractTestExtensionStore {
         miniDfs.allowSnapshot(new Path(TARGETDIR));
 
         List<Entity> entities = extension.getEntities(new 
HdfsSnapshotMirroringExtension().getName(),
-                getHdfsSnapshotExtensionProperties());
+                getHdfsSnapshotExtensionConfigStream());
         if (entities == null || entities.isEmpty()) {
             Assert.fail("Entities returned cannot be null or empty");
         }
@@ -288,7 +304,7 @@ public class ExtensionTest extends 
AbstractTestExtensionStore {
         miniDfs.disallowSnapshot(new Path(SOURCEDIR));
 
         List<Entity> entities = extension.getEntities(new 
HdfsSnapshotMirroringExtension().getName(),
-                getHdfsSnapshotExtensionProperties());
+                getHdfsSnapshotExtensionConfigStream());
         if (entities == null || entities.isEmpty()) {
             Assert.fail("Entities returned cannot be null or empty");
         }
@@ -296,10 +312,13 @@ public class ExtensionTest extends 
AbstractTestExtensionStore {
 
     @Test(expectedExceptions = FalconException.class,
             expectedExceptionsMessageRegExp = "Missing extension property: 
sourceCluster")
-    public void 
testGetExtensionEntitiesForHdfsSnapshotMirroringMissingProperties() throws 
FalconException {
-        Properties props = getHdfsSnapshotExtensionProperties();
+    public void 
testGetExtensionEntitiesForHdfsSnapshotMirroringMissingProperties()
+        throws FalconException, IOException {
+        Properties props = new Properties();
+        props.load(getHdfsSnapshotExtensionConfigStream());
         props.remove(HdfsSnapshotMirrorProperties.SOURCE_CLUSTER.getName());
-        extension.getEntities(new HdfsSnapshotMirroringExtension().getName(), 
props);
+        extension.getEntities(new HdfsSnapshotMirroringExtension().getName(),
+                getConfigStream(props, 
"target/HdfsSnapshotMirroringMissing.propertes"));
     }
 
     @Test(dependsOnMethods = "testHdfsSnapshotMirroringNonSnapshotableDir",
@@ -311,7 +330,7 @@ public class ExtensionTest extends 
AbstractTestExtensionStore {
         }
 
         List<Entity> entities = extension.getEntities(new 
HdfsSnapshotMirroringExtension().getName(),
-                getHdfsSnapshotExtensionProperties());
+                getHdfsSnapshotExtensionConfigStream());
         if (entities == null || entities.isEmpty()) {
             Assert.fail("Entities returned cannot be null or empty");
         }

http://git-wip-us.apache.org/repos/asf/falcon/blob/cec771ca/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
----------------------------------------------------------------------
diff --git 
a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
 
b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
index 5669b8f..7fd4de5 100644
--- 
a/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
+++ 
b/prism/src/main/java/org/apache/falcon/resource/extensions/ExtensionManager.java
@@ -483,7 +483,7 @@ public class ExtensionManager extends 
AbstractSchedulableEntityManager {
         // get entities for extension job
         Properties properties = new Properties();
         properties.load(request.getInputStream());
-        List<Entity> entities = extension.getEntities(extensionName, 
properties);
+        List<Entity> entities = extension.getEntities(extensionName, 
request.getInputStream());
 
         // add tags on extension name and job
         for (Entity entity : entities) {

Reply via email to