Repository: incubator-griffin Updated Branches: refs/heads/master 8171ab254 -> d6b4abb9f
[GRIFFIN-117] implement sample missing download service, see details in Author: wenzhao <[email protected]> Closes #276 from vzhao/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/d6b4abb9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/d6b4abb9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/d6b4abb9 Branch: refs/heads/master Commit: d6b4abb9fadd0e85bf61eb1fd3f0a1262241411c Parents: 8171ab2 Author: wenzhao <[email protected]> Authored: Thu May 10 18:06:13 2018 +0800 Committer: Lionel Liu <[email protected]> Committed: Thu May 10 18:06:13 2018 +0800 ---------------------------------------------------------------------- griffin-doc/service/api-guide.md | 10 ++++ .../core/exception/GriffinExceptionMessage.java | 1 + .../apache/griffin/core/job/JobController.java | 17 +++++- .../core/measure/MeasureOrgServiceImpl.java | 2 + .../org/apache/griffin/core/util/FSUtil.java | 59 ++++++++++++++------ 5 files changed, 71 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d6b4abb9/griffin-doc/service/api-guide.md ---------------------------------------------------------------------- diff --git a/griffin-doc/service/api-guide.md b/griffin-doc/service/api-guide.md index cdd73ac..2a95f06 100644 --- a/griffin-doc/service/api-guide.md +++ b/griffin-doc/service/api-guide.md @@ -865,6 +865,16 @@ The response body should be empty if no error happens, and the HTTP status is (2 } ``` +### Download sample missing/mismatched records +`GET /api/v1/jobs/download?hdfsPath={missingDataFilePath}` + +#### Response +``` +If successful, this method returns missing records in the response body, +maximum record count is 100. + +``` + <h2 id = "4"></h2> ## Metrics ### Get metrics http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d6b4abb9/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java index e298797..7b9c06c 100644 --- a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java +++ b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java @@ -38,6 +38,7 @@ public enum GriffinExceptionMessage { JOB_ID_DOES_NOT_EXIST(40402, "Job id does not exist"), JOB_NAME_DOES_NOT_EXIST(40403, "Job name does not exist"), ORGANIZATION_NAME_DOES_NOT_EXIST(40404, "Organization name does not exist"), + HDFS_FILE_NOT_EXIST(40405, "Hadoop data file not exist"), //409, "Conflict" MEASURE_NAME_ALREADY_EXIST(40901, "Measure name already exists"), http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d6b4abb9/service/src/main/java/org/apache/griffin/core/job/JobController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java index 2d09d8b..81b7fb7 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobController.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java @@ -23,10 +23,16 @@ import org.apache.griffin.core.job.entity.JobDataBean; import org.apache.griffin.core.job.entity.JobHealth; import org.apache.griffin.core.job.entity.JobInstanceBean; import org.apache.griffin.core.job.entity.JobSchedule; +import org.apache.griffin.core.util.FSUtil; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.InputStreamResource; +import org.springframework.core.io.Resource; import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import java.io.IOException; import java.util.List; @RestController @@ -73,6 +79,13 @@ public class JobController { public JobHealth getHealthInfo() { return jobService.getHealthInfo(); } -} - + @RequestMapping(path = "/jobs/download", method = RequestMethod.GET) + public ResponseEntity<Resource> download(@RequestParam("hdfsPath") String hdfsPath) throws IOException { + InputStreamResource resource = new InputStreamResource(FSUtil.getSampleInputStream(hdfsPath)); + return ResponseEntity.ok(). + header("content-disposition", "attachment; filename = sampleMissingData.json") + .contentType(MediaType.APPLICATION_OCTET_STREAM) + .body(resource); + } +} http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d6b4abb9/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java index c498893..bedae0e 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java @@ -32,6 +32,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.ORGANIZATION_NAME_DOES_NOT_EXIST; + @Service public class MeasureOrgServiceImpl implements MeasureOrgService { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d6b4abb9/service/src/main/java/org/apache/griffin/core/util/FSUtil.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java index d7eedca..126fbea 100644 --- a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java +++ b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java @@ -19,25 +19,31 @@ under the License. package org.apache.griffin.core.util; +import org.apache.commons.io.Charsets; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.exception.GriffinException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; +import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; import java.util.ArrayList; import java.util.List; +import static org.apache.griffin.core.exception.GriffinExceptionMessage.HDFS_FILE_NOT_EXIST; + @Component public class FSUtil { private static final Logger LOGGER = LoggerFactory.getLogger(FSUtil.class); + private static final int SAMPLE_ROW_COUNT = 100; private static String fsDefaultName; @@ -82,9 +88,7 @@ public class FSUtil { * list all sub dir of a dir */ public static List<String> listSubDir(String dir) throws IOException { - if (getFileSystem() == null) { - throw new NullPointerException("FileSystem is null.Please check your hdfs config default name."); - } + checkHDFSConf(); List<String> fileList = new ArrayList<>(); Path path = new Path(dir); if (fileSystem.isFile(path)) { @@ -104,9 +108,7 @@ public class FSUtil { * get all file status of a dir. */ public static List<FileStatus> listFileStatus(String dir) throws IOException { - if (getFileSystem() == null) { - throw new NullPointerException("FileSystem is null.Please check your hdfs config default name."); - } + checkHDFSConf(); List<FileStatus> fileStatusList = new ArrayList<>(); Path path = new Path(dir); if (fileSystem.isFile(path)) { @@ -125,9 +127,7 @@ public class FSUtil { * touch file */ public static void touch(String filePath) throws IOException { - if (getFileSystem() == null) { - throw new NullPointerException("FileSystem is null.Please check your hdfs config default name."); - } + checkHDFSConf(); Path path = new Path(filePath); FileStatus st; if (fileSystem.exists(path)) { @@ -149,13 +149,40 @@ public class FSUtil { } - public static boolean isFileExist(String path) throws IOException { + checkHDFSConf(); + Path hdfsPath = new Path(path); + return fileSystem.isFile(hdfsPath) || fileSystem.isDirectory(hdfsPath); + } + + public static InputStream getSampleInputStream(String path) throws IOException { + checkHDFSConf(); + if (isFileExist(path)) { + FSDataInputStream missingData = fileSystem.open(new Path(path)); + BufferedReader bufReader = new BufferedReader(new InputStreamReader(missingData, Charsets.UTF_8)); + String line = null; + int rowCnt = 0; + StringBuilder output = new StringBuilder(1024); + + while ((line = bufReader.readLine()) != null) { + if (rowCnt < SAMPLE_ROW_COUNT) { + output.append(line); + output.append("\n"); + } + rowCnt++; + } + + return IOUtils.toInputStream(output, Charsets.UTF_8); + } else { + LOGGER.warn("HDFS file does not exist.", path); + throw new GriffinException.NotFoundException(HDFS_FILE_NOT_EXIST); + } + } + + private static void checkHDFSConf() { if (getFileSystem() == null) { throw new NullPointerException("FileSystem is null.Please check your hdfs config default name."); } - Path hdfsPath = new Path(path); - return fileSystem.isFile(hdfsPath) || fileSystem.isDirectory(hdfsPath); } }
