Repository: ambari Updated Branches: refs/heads/branch-2.1 c4d2e839b -> 5222188ac
AMBARI-12028. Add Better Error Handling for accessing HDFS (Erik Bergenholtz via rlevas) Project: http://git-wip-us.apache.org/repos/asf/ambari/repo Commit: http://git-wip-us.apache.org/repos/asf/ambari/commit/5222188a Tree: http://git-wip-us.apache.org/repos/asf/ambari/tree/5222188a Diff: http://git-wip-us.apache.org/repos/asf/ambari/diff/5222188a Branch: refs/heads/branch-2.1 Commit: 5222188ac7df2a3404da6b25ea01f9940cbd5cc4 Parents: c4d2e83 Author: Erik Bergenholtz <[email protected]> Authored: Mon Jun 22 11:42:37 2015 -0400 Committer: Robert Levas <[email protected]> Committed: Mon Jun 22 11:42:37 2015 -0400 ---------------------------------------------------------------------- contrib/views/hive/pom.xml | 7 +- .../ui/hive-web/app/controllers/index.js | 4 +- .../ui/hive-web/app/controllers/open-queries.js | 1 - .../hive-web/app/controllers/visual-explain.js | 6 -- contrib/views/utils/pom.xml | 7 +- .../apache/ambari/view/utils/hdfs/HdfsApi.java | 82 +++++++++++++++----- 6 files changed, 77 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/views/hive/pom.xml b/contrib/views/hive/pom.xml index 2130c4a..afb80a3 100644 --- a/contrib/views/hive/pom.xml +++ b/contrib/views/hive/pom.xml @@ -194,10 +194,15 @@ <artifactId>commons-validator</artifactId> <version>1.4.0</version> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + </dependency> </dependencies> <properties> - <hadoop-version>2.6.0</hadoop-version> + <hadoop-version>2.7.0</hadoop-version> <ambari.dir>${project.parent.parent.parent.basedir}</ambari.dir> <hive-version>1.0.0</hive-version> <ambari.version>2.1.0-SNAPSHOT</ambari.version> http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js index b3cd127..72fd37c 100644 --- a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/index.js @@ -109,7 +109,7 @@ export default Ember.Controller.extend({ return true; }.property('model.isRunning', '[email protected]'), - parseQueryParams: function () { + currentQueryObserver: function () { var query = this.get('openQueries.currentQuery.fileContent'), param, updatedParams = [], @@ -129,6 +129,8 @@ export default Ember.Controller.extend({ }); currentParams.setObjects(updatedParams); + + this.set('visualExplain.shouldChangeGraph', true); }.observes('openQueries.currentQuery.fileContent'), _executeQuery: function (referrer, shouldExplain, shouldGetVisualExplain) { http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js index 5ab46f6..611d8fe 100644 --- a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/open-queries.js @@ -196,7 +196,6 @@ export default Ember.ArrayController.extend({ var content = query.get('fileContent'); content = self.get('index').buildQuery(query); content = self.get('index').bindQueryParams(content); - content = self.get('index').prependQuerySettings(content); //update query tab path with saved model id if its a new record if (wasNew) { http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js ---------------------------------------------------------------------- diff --git a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js index d6ae8c4..71e3c87 100644 --- a/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js +++ b/contrib/views/hive/src/main/resources/ui/hive-web/app/controllers/visual-explain.js @@ -24,14 +24,8 @@ export default Ember.Controller.extend({ notifyService: Ember.inject.service(constants.namingConventions.notify), index: Ember.inject.controller(), - openQueries: Ember.inject.controller(), - verticesProgress: Ember.computed.alias('jobProgressService.currentJob.stages'), - observeCurrentQuery: function () { - this.set('shouldChangeGraph', true); - }.observes('openQueries.currentQuery', 'openQueries.currentQuery.fileContent'), - actions: { onTabOpen: function () { var self = this; http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/utils/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/views/utils/pom.xml b/contrib/views/utils/pom.xml index 3e1ca4f..f930061 100644 --- a/contrib/views/utils/pom.xml +++ b/contrib/views/utils/pom.xml @@ -116,11 +116,16 @@ <artifactId>gson</artifactId> <version>2.2.2</version> </dependency> + <dependency> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + <version>2.4</version> + </dependency> </dependencies> <properties> <ambari.dir>${project.parent.parent.parent.basedir}</ambari.dir> - <hadoop-version>2.6.0</hadoop-version> + <hadoop-version>2.7.0</hadoop-version> </properties> <build> </build> http://git-wip-us.apache.org/repos/asf/ambari/blob/5222188a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java ---------------------------------------------------------------------- diff --git a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java b/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java index 80603b5..c7ae952 100644 --- a/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java +++ b/contrib/views/utils/src/main/java/org/apache/ambari/view/utils/hdfs/HdfsApi.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.web.WebHdfsFileSystem; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.json.simple.JSONArray; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.security.auth.Subject; @@ -42,7 +44,10 @@ import javax.security.auth.Subject; * Hdfs Business Delegate */ public class HdfsApi { -private final Configuration conf; + private final static Logger LOG = + LoggerFactory.getLogger(HdfsApi.class); + + private final Configuration conf; private final Map<String, String> authParams; private FileSystem fs; @@ -60,7 +65,7 @@ private final Configuration conf; conf = configurationBuilder.buildConfig(); ugi = UserGroupInformation.createProxyUser(username, getProxyUser()); - fs = ugi.doAs(new PrivilegedExceptionAction<FileSystem>() { + fs = execute(new PrivilegedExceptionAction<FileSystem>() { public FileSystem run() throws IOException { return FileSystem.get(conf); } @@ -100,7 +105,7 @@ private final Configuration conf; */ public FileStatus[] listdir(final String path) throws FileNotFoundException, IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<FileStatus[]>() { + return execute(new PrivilegedExceptionAction<FileStatus[]>() { public FileStatus[] run() throws FileNotFoundException, Exception { return fs.listStatus(new Path(path)); } @@ -117,7 +122,7 @@ private final Configuration conf; */ public FileStatus getFileStatus(final String path) throws IOException, FileNotFoundException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<FileStatus>() { + return execute(new PrivilegedExceptionAction<FileStatus>() { public FileStatus run() throws FileNotFoundException, IOException { return fs.getFileStatus(new Path(path)); } @@ -133,7 +138,7 @@ private final Configuration conf; */ public boolean mkdir(final String path) throws IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + return execute(new PrivilegedExceptionAction<Boolean>() { public Boolean run() throws Exception { return fs.mkdirs(new Path(path)); } @@ -150,7 +155,7 @@ private final Configuration conf; */ public boolean rename(final String src, final String dst) throws IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + return execute(new PrivilegedExceptionAction<Boolean>() { public Boolean run() throws Exception { return fs.rename(new Path(src), new Path(dst)); } @@ -163,7 +168,7 @@ private final Configuration conf; * @throws Exception */ public boolean trashEnabled() throws Exception { - return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + return execute(new PrivilegedExceptionAction<Boolean>() { public Boolean run() throws IOException { Trash tr = new Trash(fs, conf); return tr.isEnabled(); @@ -177,7 +182,7 @@ private final Configuration conf; * @throws Exception */ public Path getHomeDir() throws Exception { - return ugi.doAs(new PrivilegedExceptionAction<Path>() { + return execute(new PrivilegedExceptionAction<Path>() { public Path run() throws IOException { return fs.getHomeDirectory(); } @@ -190,7 +195,7 @@ private final Configuration conf; * @throws Exception */ public synchronized FsStatus getStatus() throws Exception { - return ugi.doAs(new PrivilegedExceptionAction<FsStatus>() { + return execute(new PrivilegedExceptionAction<FsStatus>() { public FsStatus run() throws IOException { return fs.getStatus(); } @@ -203,7 +208,7 @@ private final Configuration conf; * @throws Exception */ public Path getTrashDir() throws Exception { - return ugi.doAs(new PrivilegedExceptionAction<Path>() { + return execute(new PrivilegedExceptionAction<Path>() { public Path run() throws IOException { TrashPolicy trashPolicy = TrashPolicy.getInstance(conf, fs, fs.getHomeDirectory()); @@ -246,7 +251,7 @@ private final Configuration conf; * @throws Exception */ public Void emptyTrash() throws Exception { - return ugi.doAs(new PrivilegedExceptionAction<Void>() { + return execute(new PrivilegedExceptionAction<Void>() { public Void run() throws IOException { Trash tr = new Trash(fs, conf); tr.expunge(); @@ -264,7 +269,7 @@ private final Configuration conf; */ public boolean moveToTrash(final String path) throws IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + return execute(new PrivilegedExceptionAction<Boolean>() { public Boolean run() throws Exception { return Trash.moveToAppropriateTrash(fs, new Path(path), conf); } @@ -281,7 +286,7 @@ private final Configuration conf; */ public boolean delete(final String path, final boolean recursive) throws IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + return execute(new PrivilegedExceptionAction<Boolean>() { public Boolean run() throws Exception { return fs.delete(new Path(path), recursive); } @@ -298,7 +303,7 @@ private final Configuration conf; */ public FSDataOutputStream create(final String path, final boolean overwrite) throws IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<FSDataOutputStream>() { + return execute(new PrivilegedExceptionAction<FSDataOutputStream>() { public FSDataOutputStream run() throws Exception { return fs.create(new Path(path), overwrite); } @@ -314,7 +319,7 @@ private final Configuration conf; */ public FSDataInputStream open(final String path) throws IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<FSDataInputStream>() { + return execute(new PrivilegedExceptionAction<FSDataInputStream>() { public FSDataInputStream run() throws Exception { return fs.open(new Path(path)); } @@ -330,7 +335,7 @@ private final Configuration conf; */ public boolean chmod(final String path, final String permissions) throws IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + return execute(new PrivilegedExceptionAction<Boolean>() { public Boolean run() throws Exception { try { fs.setPermission(new Path(path), FsPermission.valueOf(permissions)); @@ -349,8 +354,8 @@ private final Configuration conf; * @throws java.io.IOException * @throws InterruptedException */ - public synchronized void copy(final String src, final String dest) throws IOException, InterruptedException, HdfsApiException { - boolean result = ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + public void copy(final String src, final String dest) throws IOException, InterruptedException, HdfsApiException { + boolean result = execute(new PrivilegedExceptionAction<Boolean>() { public Boolean run() throws Exception { return FileUtil.copy(fs, new Path(src), fs, new Path(dest), false, conf); } @@ -361,8 +366,8 @@ private final Configuration conf; } } - public synchronized boolean exists(final String newFilePath) throws IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { + public boolean exists(final String newFilePath) throws IOException, InterruptedException { + return execute(new PrivilegedExceptionAction<Boolean>() { public Boolean run() throws Exception { return fs.exists(new Path(newFilePath)); } @@ -370,6 +375,43 @@ private final Configuration conf; } /** + * Executes action on HDFS using doAs + * @param action strategy object + * @param <T> result type + * @return result of operation + * @throws IOException + * @throws InterruptedException + */ + public <T> T execute(PrivilegedExceptionAction<T> action) + throws IOException, InterruptedException { + T result = null; + + // Retry strategy applied here due to HDFS-1058. HDFS can throw random + // IOException about retrieving block from DN if concurrent read/write + // on specific file is performed (see details on HDFS-1058). + int tryNumber = 0; + boolean succeeded = false; + do { + tryNumber += 1; + try { + result = ugi.doAs(action); + succeeded = true; + } catch (IOException ex) { + if (!ex.getMessage().contains("Cannot obtain block length for")) { + throw ex; + } + if (tryNumber >= 3) { + throw ex; + } + LOG.info("HDFS threw 'IOException: Cannot obtain block length' exception. " + + "Retrying... Try #" + (tryNumber + 1)); + Thread.sleep(1000); //retry after 1 second + } + } while (!succeeded); + return result; + } + + /** * Converts a Hadoop permission into a Unix permission symbolic representation * (i.e. -rwxr--r--) or default if the permission is NULL. *
