Repository: incubator-griffin
Updated Branches:
  refs/heads/master 8171ab254 -> d6b4abb9f


[GRIFFIN-117] implement sample missing download service, see details in

Author: wenzhao <[email protected]>

Closes #276 from vzhao/master.


Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/d6b4abb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/d6b4abb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/d6b4abb9

Branch: refs/heads/master
Commit: d6b4abb9fadd0e85bf61eb1fd3f0a1262241411c
Parents: 8171ab2
Author: wenzhao <[email protected]>
Authored: Thu May 10 18:06:13 2018 +0800
Committer: Lionel Liu <[email protected]>
Committed: Thu May 10 18:06:13 2018 +0800

----------------------------------------------------------------------
 griffin-doc/service/api-guide.md                | 10 ++++
 .../core/exception/GriffinExceptionMessage.java |  1 +
 .../apache/griffin/core/job/JobController.java  | 17 +++++-
 .../core/measure/MeasureOrgServiceImpl.java     |  2 +
 .../org/apache/griffin/core/util/FSUtil.java    | 59 ++++++++++++++------
 5 files changed, 71 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d6b4abb9/griffin-doc/service/api-guide.md
----------------------------------------------------------------------
diff --git a/griffin-doc/service/api-guide.md b/griffin-doc/service/api-guide.md
index cdd73ac..2a95f06 100644
--- a/griffin-doc/service/api-guide.md
+++ b/griffin-doc/service/api-guide.md
@@ -865,6 +865,16 @@ The response body should be empty if no error happens, and 
the HTTP status is (2
 }
 ```
 
+### Download sample missing/mismatched records
+`GET /api/v1/jobs/download?hdfsPath={missingDataFilePath}`
+
+#### Response
+```
+If successful, this method returns missing records in the response body, 
+maximum record count is 100.
+
+```
+
 <h2 id = "4"></h2>
 ## Metrics
 ### Get metrics

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d6b4abb9/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
 
b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
index e298797..7b9c06c 100644
--- 
a/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
+++ 
b/service/src/main/java/org/apache/griffin/core/exception/GriffinExceptionMessage.java
@@ -38,6 +38,7 @@ public enum GriffinExceptionMessage {
     JOB_ID_DOES_NOT_EXIST(40402, "Job id does not exist"),
     JOB_NAME_DOES_NOT_EXIST(40403, "Job name does not exist"),
     ORGANIZATION_NAME_DOES_NOT_EXIST(40404, "Organization name does not 
exist"),
+    HDFS_FILE_NOT_EXIST(40405, "Hadoop data file not exist"),
 
     //409, "Conflict"
     MEASURE_NAME_ALREADY_EXIST(40901, "Measure name already exists"),

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d6b4abb9/service/src/main/java/org/apache/griffin/core/job/JobController.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/job/JobController.java 
b/service/src/main/java/org/apache/griffin/core/job/JobController.java
index 2d09d8b..81b7fb7 100644
--- a/service/src/main/java/org/apache/griffin/core/job/JobController.java
+++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java
@@ -23,10 +23,16 @@ import org.apache.griffin.core.job.entity.JobDataBean;
 import org.apache.griffin.core.job.entity.JobHealth;
 import org.apache.griffin.core.job.entity.JobInstanceBean;
 import org.apache.griffin.core.job.entity.JobSchedule;
+import org.apache.griffin.core.util.FSUtil;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.io.InputStreamResource;
+import org.springframework.core.io.Resource;
 import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 
+import java.io.IOException;
 import java.util.List;
 
 @RestController
@@ -73,6 +79,13 @@ public class JobController {
     public JobHealth getHealthInfo() {
         return jobService.getHealthInfo();
     }
-}
-
 
+    @RequestMapping(path = "/jobs/download", method = RequestMethod.GET)
+    public ResponseEntity<Resource> download(@RequestParam("hdfsPath") String 
hdfsPath) throws IOException {
+        InputStreamResource resource = new 
InputStreamResource(FSUtil.getSampleInputStream(hdfsPath));
+        return ResponseEntity.ok().
+                header("content-disposition", "attachment; filename = 
sampleMissingData.json")
+                .contentType(MediaType.APPLICATION_OCTET_STREAM)
+                .body(resource);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d6b4abb9/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
----------------------------------------------------------------------
diff --git 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
index c498893..bedae0e 100644
--- 
a/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
+++ 
b/service/src/main/java/org/apache/griffin/core/measure/MeasureOrgServiceImpl.java
@@ -32,6 +32,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.griffin.core.exception.GriffinExceptionMessage.ORGANIZATION_NAME_DOES_NOT_EXIST;
+
 @Service
 public class MeasureOrgServiceImpl implements MeasureOrgService {
 

http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/d6b4abb9/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
----------------------------------------------------------------------
diff --git a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java 
b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
index d7eedca..126fbea 100644
--- a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
+++ b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java
@@ -19,25 +19,31 @@ under the License.
 
 package org.apache.griffin.core.util;
 
+import org.apache.commons.io.Charsets;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang.StringUtils;
+import org.apache.griffin.core.exception.GriffinException;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.util.ArrayList;
 import java.util.List;
 
+import static 
org.apache.griffin.core.exception.GriffinExceptionMessage.HDFS_FILE_NOT_EXIST;
+
 @Component
 public class FSUtil {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(FSUtil.class);
+    private static final int SAMPLE_ROW_COUNT = 100;
 
     private static String fsDefaultName;
 
@@ -82,9 +88,7 @@ public class FSUtil {
      * list all sub dir of a dir
      */
     public static List<String> listSubDir(String dir) throws IOException {
-        if (getFileSystem() == null) {
-            throw new NullPointerException("FileSystem is null.Please check 
your hdfs config default name.");
-        }
+        checkHDFSConf();
         List<String> fileList = new ArrayList<>();
         Path path = new Path(dir);
         if (fileSystem.isFile(path)) {
@@ -104,9 +108,7 @@ public class FSUtil {
      * get all file status of a dir.
      */
     public static List<FileStatus> listFileStatus(String dir) throws 
IOException {
-        if (getFileSystem() == null) {
-            throw new NullPointerException("FileSystem is null.Please check 
your hdfs config default name.");
-        }
+        checkHDFSConf();
         List<FileStatus> fileStatusList = new ArrayList<>();
         Path path = new Path(dir);
         if (fileSystem.isFile(path)) {
@@ -125,9 +127,7 @@ public class FSUtil {
      * touch file
      */
     public static void touch(String filePath) throws IOException {
-        if (getFileSystem() == null) {
-            throw new NullPointerException("FileSystem is null.Please check 
your hdfs config default name.");
-        }
+        checkHDFSConf();
         Path path = new Path(filePath);
         FileStatus st;
         if (fileSystem.exists(path)) {
@@ -149,13 +149,40 @@ public class FSUtil {
 
     }
 
-
     public static boolean isFileExist(String path) throws IOException {
+        checkHDFSConf();
+        Path hdfsPath = new Path(path);
+        return fileSystem.isFile(hdfsPath) || fileSystem.isDirectory(hdfsPath);
+    }
+
+    public static InputStream getSampleInputStream(String path) throws 
IOException {
+        checkHDFSConf();
+        if (isFileExist(path)) {
+            FSDataInputStream missingData = fileSystem.open(new Path(path));
+            BufferedReader bufReader = new BufferedReader(new 
InputStreamReader(missingData, Charsets.UTF_8));
+            String line = null;
+            int rowCnt = 0;
+            StringBuilder output = new StringBuilder(1024);
+
+            while ((line = bufReader.readLine()) != null) {
+                if (rowCnt < SAMPLE_ROW_COUNT) {
+                    output.append(line);
+                    output.append("\n");
+                }
+                rowCnt++;
+            }
+
+            return IOUtils.toInputStream(output, Charsets.UTF_8);
+        } else {
+            LOGGER.warn("HDFS file does not exist.", path);
+            throw new GriffinException.NotFoundException(HDFS_FILE_NOT_EXIST);
+        }
+    }
+
+    private static void checkHDFSConf() {
         if (getFileSystem() == null) {
             throw new NullPointerException("FileSystem is null.Please check 
your hdfs config default name.");
         }
-        Path hdfsPath = new Path(path);
-        return fileSystem.isFile(hdfsPath) || fileSystem.isDirectory(hdfsPath);
     }
 
 }

Reply via email to