Repository: falcon
Updated Branches:
refs/heads/master 993861c85 -> 0825d80a6
FALCON-2267 Definition api fails if resources are empty
This is extension of sandeepSamudrala's work on PR 353. Completing it on his
behalf as he is out on vacation.
Dev testing done:
`IM1738M1:falcon-0.11-SNAPSHOT pallavi.rao$ bin/falcon extension -enumerate
[
{
"name": "sample",
"type": "Custom extension",
"location": "hdfs://192.168.138.236:8020/tmp/extensions/extension-example"
},
â¦..
{
"name": "hive-mirroring",
"type": "Trusted extension",
"description": "This extension implements replicating hive metadata and
data from one Hadoop cluster to another Hadoop cluster.",
"location":
"file:/Users/pallavi.rao/falcon/falcon-0.11-SNAPSHOT/extensions/hive-mirroring"
}
]
IM1738M1:falcon-0.11-SNAPSHOT pallavi.rao$ bin/falcon extension -definition
-extensionName hive-mirroring
{
"shortDescription":"This extension implements replicating hive metadata and
data from one Hadoop cluster to another Hadoop cluster.",
"properties":[
{
"propertyName":"jobName",
"required":true,
"description":"Unique job name",
"example":"hive-monthly-sales-dr"
},
â¦.
]
}
IM1738M1:falcon-0.11-SNAPSHOT pallavi.rao$ bin/falcon extension -definition
-extensionName sample
Contents of file config:
Contents of file config2:
<workflow-app xmlns="uri:oozie:workflow:0.1" name="merlin-workflow">
â¦.
IM1738M1:falcon-0.11-SNAPSHOT pallavi.rao$ bin/falcon extension -describe
-extensionName sample
Extension Test
IM1738M1:falcon-0.11-SNAPSHOT pallavi.rao$ bin/falcon extension -describe
-extensionName hive-mirroring
.....
Hive Mirroring Extension
Overview
Falcon provides feature to replicate Hive metadata and data events from source
cluster to destination cluster.
â¦..`
Author: Pallavi Rao <[email protected]>
Reviewers: @PracheerAgarwal, @sandeepSamudrala
Closes #356 from pallavi-rao/2267
Project: http://git-wip-us.apache.org/repos/asf/falcon/repo
Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/0825d80a
Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/0825d80a
Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/0825d80a
Branch: refs/heads/master
Commit: 0825d80a638652bc68119425e910f969602c83c6
Parents: 993861c
Author: Pallavi Rao <[email protected]>
Authored: Thu Feb 2 17:39:23 2017 +0530
Committer: Pallavi Rao <[email protected]>
Committed: Thu Feb 2 17:39:23 2017 +0530
----------------------------------------------------------------------
.../apache/falcon/cli/FalconExtensionCLI.java | 1 -
.../falcon/extensions/store/ExtensionStore.java | 87 ++++++++-----------
.../extensions/store/ExtensionStoreTest.java | 5 +-
.../resource/AbstractExtensionManager.java | 11 +--
.../resource/proxy/ExtensionManagerProxy.java | 89 ++++++++++----------
5 files changed, 86 insertions(+), 107 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/falcon/blob/0825d80a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
----------------------------------------------------------------------
diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
index 5d44128..293df23 100644
--- a/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
+++ b/cli/src/main/java/org/apache/falcon/cli/FalconExtensionCLI.java
@@ -92,7 +92,6 @@ public class FalconExtensionCLI extends FalconCLI{
} else if (optionsList.contains(DEFINITION_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result = client.getExtensionDefinition(extensionName).getMessage();
- result = prettyPrintJson(result);
} else if (optionsList.contains(DESCRIBE_OPT)) {
validateRequiredParameter(extensionName, EXTENSION_NAME_OPT);
result =
client.getExtensionDescription(extensionName).getMessage();
http://git-wip-us.apache.org/repos/asf/falcon/blob/0825d80a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
----------------------------------------------------------------------
diff --git
a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
index 1d71651..ed42b6b 100644
---
a/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
+++
b/extensions/src/main/java/org/apache/falcon/extensions/store/ExtensionStore.java
@@ -18,6 +18,15 @@
package org.apache.falcon.extensions.store;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.parser.ValidationException;
@@ -41,16 +50,6 @@ import org.codehaus.jettison.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
/**
* Store for Falcon extensions.
*/
@@ -114,8 +113,9 @@ public final class ExtensionStore {
}
private String getShortDescription(final String extensionName) throws
FalconException {
- String content = getResource(extensionName, extensionName.toLowerCase()
- + EXTENSION_PROPERTY_JSON_SUFFIX);
+ String location = storePath.toString() + "/" + extensionName + "/META/"
+ + extensionName.toLowerCase() + EXTENSION_PROPERTY_JSON_SUFFIX;
+ String content = getResource(location);
String description;
try {
JSONObject jsonObject = new JSONObject(content);
@@ -141,40 +141,6 @@ public final class ExtensionStore {
}
}
- private Map<String, String> getExtensionArtifacts(final String
extensionName) throws
- FalconException {
- Map<String, String> extensionFileMap = new HashMap<>();
- Path extensionPath;
- try {
- RemoteIterator<LocatedFileStatus> fileStatusListIterator;
- if (AbstractExtension.isExtensionTrusted(extensionName)) {
- extensionPath = new Path(storePath,
extensionName.toLowerCase());
- fileStatusListIterator = fs.listFiles(extensionPath, true);
- } else {
- ExtensionBean extensionBean =
metaStore.getDetail(extensionName);
- if (null == extensionBean) {
- throw new StoreAccessException("Extension not found:" +
extensionName);
- }
- extensionPath = new Path(extensionBean.getLocation());
- FileSystem fileSystem =
getHdfsFileSystem(extensionBean.getLocation());
- fileStatusListIterator = fileSystem.listFiles(extensionPath,
true);
- }
-
- if (!fileStatusListIterator.hasNext()) {
- throw new StoreAccessException(" For extension " +
extensionName
- + " there are no artifacts at the extension store path
" + storePath);
- }
- while (fileStatusListIterator.hasNext()) {
- LocatedFileStatus fileStatus = fileStatusListIterator.next();
- Path filePath = fileStatus.getPath();
- extensionFileMap.put(filePath.getName(), filePath.toString());
- }
- } catch (IOException e) {
- throw new StoreAccessException(e);
- }
- return extensionFileMap;
- }
-
public Map<String, String> getExtensionResources(final String
extensionName) throws StoreAccessException {
Map<String, String> extensionFileMap = new HashMap<>();
@@ -340,7 +306,7 @@ public final class ExtensionStore {
}
FileStatus[] propStatus;
try {
- propStatus = fileSystem.listStatus(new Path(uri.getPath() +
"/META"));
+ propStatus = fileSystem.listStatus(new Path(uri.getPath() ,
"META"));
if (propStatus.length <= 0) {
throw new ValidationException("No properties file is not
present in the " + uri.getPath() + "/META"
+ " structure.");
@@ -360,13 +326,30 @@ public final class ExtensionStore {
return "Extension :" + extensionName + " registered successfully.";
}
- public String getResource(final String extensionName, final String
resourceName) throws FalconException {
- Map<String, String> resources = getExtensionArtifacts(extensionName);
- if (resources.isEmpty()) {
- throw new StoreAccessException("No extension resources found for "
+ extensionName);
+ public String getResource(final String extensionResourcePath)
+ throws FalconException {
+ StringBuilder definition = new StringBuilder();
+ Path resourcePath = new Path(extensionResourcePath);
+ FileSystem fileSystem =
HadoopClientFactory.get().createFalconFileSystem(resourcePath.toUri());
+ try {
+ if (fileSystem.isFile(resourcePath)) {
+
definition.append(getExtensionResource(extensionResourcePath.toString()));
+ } else {
+ RemoteIterator<LocatedFileStatus> fileStatusListIterator =
fileSystem.listFiles(resourcePath, false);
+ while (fileStatusListIterator.hasNext()) {
+ LocatedFileStatus fileStatus =
fileStatusListIterator.next();
+ Path filePath = fileStatus.getPath();
+ definition.append("Contents of file
").append(filePath.getName()).append(":\n");
+
definition.append(getExtensionResource(filePath.toString())).append("\n \n");
+ }
+ }
+ } catch (IOException e) {
+ LOG.error("Exception while getting file(s) with path : " +
extensionResourcePath, e);
+ throw new StoreAccessException(e);
}
- return getExtensionResource(resources.get(resourceName));
+ return definition.toString();
+
}
public Path getExtensionStorePath() {
http://git-wip-us.apache.org/repos/asf/falcon/blob/0825d80a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
----------------------------------------------------------------------
diff --git
a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
index b2fac5f..1c82db1 100644
---
a/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
+++
b/extensions/src/test/java/org/apache/falcon/extensions/store/ExtensionStoreTest.java
@@ -50,7 +50,7 @@ import java.util.Map;
public class ExtensionStoreTest extends AbstractTestExtensionStore {
private static Map<String, String> resourcesMap;
private static JailedFileSystem fs;
- protected static final String EXTENSION_PATH =
"/projects/falcon/extension";
+ protected static final String EXTENSION_PATH =
"/projects/falcon/extension/";
private static final String STORAGE_URL = "jail://global:00";
@BeforeClass
@@ -140,8 +140,7 @@ public class ExtensionStoreTest extends
AbstractTestExtensionStore {
createMETA(extensionPath);
store = ExtensionStore.get();
store.registerExtension("toBeDeleted", STORAGE_URL + extensionPath,
"test desc", "falconUser");
- Assert.assertTrue(store.getResource("toBeDeleted",
"README").equals("README"));
- store.getResource("toBeDeleted", "README");
+ Assert.assertTrue(store.getResource(STORAGE_URL + extensionPath +
"/README").equals("README"));
store.deleteExtension("toBeDeleted", "falconUser");
ExtensionMetaStore metaStore = new ExtensionMetaStore();
Assert.assertEquals(metaStore.getAllExtensions().size(), 0);
http://git-wip-us.apache.org/repos/asf/falcon/blob/0825d80a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
----------------------------------------------------------------------
diff --git
a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
index 2d4f915..a3b6ef1 100644
---
a/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
+++
b/prism/src/main/java/org/apache/falcon/resource/AbstractExtensionManager.java
@@ -268,13 +268,7 @@ public class AbstractExtensionManager extends
AbstractSchedulableEntityManager {
}
protected static void checkIfExtensionIsEnabled(String extensionName) {
- ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
- ExtensionBean extensionBean = metaStore.getDetail(extensionName);
- if (extensionBean == null) {
- LOG.error("Extension not found: " + extensionName);
- throw FalconWebException.newAPIException("Extension not found:" +
extensionName,
- Response.Status.NOT_FOUND);
- }
+ ExtensionBean extensionBean = getExtensionIfExists(extensionName);
if (!extensionBean.getStatus().equals(ExtensionStatus.ENABLED)) {
LOG.error("Extension: " + extensionName + " is in disabled
state.");
throw FalconWebException.newAPIException("Extension: " +
extensionName + " is in disabled state.",
@@ -282,7 +276,7 @@ public class AbstractExtensionManager extends
AbstractSchedulableEntityManager {
}
}
- protected static void checkIfExtensionExists(String extensionName) {
+ protected static ExtensionBean getExtensionIfExists(String extensionName) {
ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
ExtensionBean extensionBean = metaStore.getDetail(extensionName);
if (extensionBean == null) {
@@ -290,6 +284,7 @@ public class AbstractExtensionManager extends
AbstractSchedulableEntityManager {
throw FalconWebException.newAPIException("Extension not found:" +
extensionName,
Response.Status.NOT_FOUND);
}
+ return extensionBean;
}
protected static void checkIfExtensionJobNameExists(String jobName, String
extensionName) {
http://git-wip-us.apache.org/repos/asf/falcon/blob/0825d80a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
----------------------------------------------------------------------
diff --git
a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
index c387f08..61aa39a 100644
---
a/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
+++
b/prism/src/main/java/org/apache/falcon/resource/proxy/ExtensionManagerProxy.java
@@ -20,6 +20,34 @@ package org.apache.falcon.resource.proxy;
import com.sun.jersey.multipart.FormDataBodyPart;
import com.sun.jersey.multipart.FormDataParam;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import javax.servlet.ServletInputStream;
+import javax.servlet.http.HttpServletRequest;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DefaultValue;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.Context;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.xml.bind.JAXBException;
import org.apache.commons.io.IOUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.FalconWebException;
@@ -30,52 +58,24 @@ import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.extensions.Extension;
+import org.apache.falcon.extensions.ExtensionProperties;
import org.apache.falcon.extensions.ExtensionService;
import org.apache.falcon.extensions.ExtensionType;
-import org.apache.falcon.extensions.ExtensionProperties;
import org.apache.falcon.extensions.jdbc.ExtensionMetaStore;
import org.apache.falcon.extensions.store.ExtensionStore;
import org.apache.falcon.persistence.ExtensionBean;
import org.apache.falcon.persistence.ExtensionJobsBean;
-import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.AbstractExtensionManager;
import org.apache.falcon.resource.ExtensionInstanceList;
import org.apache.falcon.resource.ExtensionJobList;
+import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.service.Services;
import org.apache.falcon.util.DeploymentUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.ServletInputStream;
-import javax.servlet.http.HttpServletRequest;
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DefaultValue;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.Context;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-import javax.xml.bind.JAXBException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Arrays;
-import java.util.Set;
-import java.util.Properties;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.SortedMap;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.ByteArrayInputStream;
-
/**
* Jersey Resource for extension job operations.
*/
@@ -101,7 +101,7 @@ public class ExtensionManagerProxy extends
AbstractExtensionManager {
@DefaultValue(ASCENDING_SORT_ORDER) @QueryParam("sortOrder")
String sortOrder,
@DefaultValue("") @QueryParam("doAs") String doAsUser) {
checkIfExtensionServiceIsEnabled();
- checkIfExtensionExists(extensionName);
+ getExtensionIfExists(extensionName);
try {
return super.getExtensionJobs(extensionName, sortOrder, doAsUser);
} catch (Throwable e) {
@@ -341,14 +341,7 @@ public class ExtensionManagerProxy extends
AbstractExtensionManager {
}
private ExtensionType getExtensionType(String extensionName) {
- ExtensionMetaStore metaStore = ExtensionStore.getMetaStore();
- ExtensionBean extensionDetails = metaStore.getDetail(extensionName);
- if (extensionDetails == null) {
- // return failure if the extension job doesn't exist
- LOG.error("Extension not found: " + extensionName);
- throw FalconWebException.newAPIException("Extension not found:" +
extensionName,
- Response.Status.NOT_FOUND);
- }
+ ExtensionBean extensionDetails = getExtensionIfExists(extensionName);
return extensionDetails.getExtensionType();
}
@@ -623,9 +616,10 @@ public class ExtensionManagerProxy extends
AbstractExtensionManager {
public APIResult getExtensionDescription(
@PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
- validateExtensionName(extensionName);
+ ExtensionBean extensionBean = getExtensionIfExists(extensionName);
try {
- return new APIResult(APIResult.Status.SUCCEEDED,
ExtensionStore.get().getResource(extensionName, README));
+ String extensionResourcePath = extensionBean.getLocation() +
File.separator + README;
+ return new APIResult(APIResult.Status.SUCCEEDED,
ExtensionStore.get().getResource(extensionResourcePath));
} catch (FalconException e) {
throw FalconWebException.newAPIException(e,
Response.Status.BAD_REQUEST);
} catch (Throwable e) {
@@ -694,9 +688,18 @@ public class ExtensionManagerProxy extends
AbstractExtensionManager {
public APIResult getExtensionDefinition(
@PathParam("extension-name") String extensionName) {
checkIfExtensionServiceIsEnabled();
+ ExtensionBean extensionBean = getExtensionIfExists(extensionName);
try {
- return new APIResult(APIResult.Status.SUCCEEDED,
ExtensionStore.get().getResource(extensionName,
- extensionName.toLowerCase() +
EXTENSION_PROPERTY_JSON_SUFFIX));
+ ExtensionType extensionType = extensionBean.getExtensionType();
+ String extensionResourcePath;
+ if (ExtensionType.TRUSTED.equals(extensionType)) {
+ extensionResourcePath = extensionBean.getLocation() + "/META/"
+ + extensionName.toLowerCase() +
EXTENSION_PROPERTY_JSON_SUFFIX;
+ } else {
+ extensionResourcePath = extensionBean.getLocation() + "/META";
+ }
+ return new APIResult(APIResult.Status.SUCCEEDED,
+ ExtensionStore.get().getResource(extensionResourcePath));
} catch (FalconException e) {
throw FalconWebException.newAPIException(e,
Response.Status.BAD_REQUEST);
} catch (Throwable e) {