Repository: incubator-griffin Updated Branches: refs/heads/master aa6a5a6f7 -> f0ee5ec92
[GRIFFIN-184] service for download miss records GRIFFIN-184 - service for download miss records Author: Li, Juan <[email protected]> Closes #365 from icesmartjuan/feature/download. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/f0ee5ec9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/f0ee5ec9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/f0ee5ec9 Branch: refs/heads/master Commit: f0ee5ec9261edb50cb4cb8a69edd348feb953b52 Parents: aa6a5a6 Author: Li, Juan <[email protected]> Authored: Tue Jul 24 08:39:42 2018 +0800 Committer: William Guo <[email protected]> Committed: Tue Jul 24 08:39:42 2018 +0800 ---------------------------------------------------------------------- .../apache/griffin/core/job/JobController.java | 5 +-- .../org/apache/griffin/core/job/JobService.java | 2 ++ .../apache/griffin/core/job/JobServiceImpl.java | 35 ++++++++++++++++++++ .../org/apache/griffin/core/util/FSUtil.java | 20 ++++++++++- 4 files changed, 59 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/job/JobController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java index 1d0a8ac..2f65b63 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobController.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java @@ -87,8 +87,9 @@ public class JobController { } @RequestMapping(path = "/jobs/download", method = RequestMethod.GET) - public ResponseEntity<Resource> download(@RequestParam("hdfsPath") String hdfsPath) throws IOException { - InputStreamResource resource = new InputStreamResource(FSUtil.getSampleInputStream(hdfsPath)); + public ResponseEntity<Resource> download(@RequestParam("jobName") String jobName ,@RequestParam("ts") long timestamp) throws Exception { + String path = jobService.getJobHdfsPersistPath(jobName,timestamp); + InputStreamResource resource = new InputStreamResource(FSUtil.getMissSampleInputStream(path)); return ResponseEntity.ok(). header("content-disposition", "attachment; filename = sampleMissingData.json") .contentType(MediaType.APPLICATION_OCTET_STREAM) http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/job/JobService.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java b/service/src/main/java/org/apache/griffin/core/job/JobService.java index 7539cea..269f004 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobService.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java @@ -42,4 +42,6 @@ public interface JobService { List<JobInstanceBean> findInstancesOfJob(Long jobId, int page, int size); JobHealth getHealthInfo(); + + String getJobHdfsPersistPath(String jobName, long timestamp); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java index 8e68567..29cb249 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java @@ -20,6 +20,7 @@ under the License. package org.apache.griffin.core.job; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.commons.lang.StringUtils; import org.apache.griffin.core.exception.GriffinException; import org.apache.griffin.core.job.entity.*; @@ -33,6 +34,8 @@ import org.apache.griffin.core.measure.entity.GriffinMeasure.ProcessType; import org.apache.griffin.core.measure.repo.GriffinMeasureRepo; import org.apache.griffin.core.util.JsonUtil; import org.apache.griffin.core.util.YarnNetUtil; +import org.json.JSONArray; +import org.json.JSONObject; import org.quartz.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,6 +58,8 @@ import java.util.List; import java.util.TimeZone; import static java.util.TimeZone.getTimeZone; +import static org.apache.griffin.core.config.EnvConfig.ENV_BATCH; +import static org.apache.griffin.core.config.EnvConfig.ENV_STREAMING; import static org.apache.griffin.core.exception.GriffinExceptionMessage.*; import static org.apache.griffin.core.job.entity.LivySessionStates.State.*; import static org.apache.griffin.core.job.entity.LivySessionStates.isActive; @@ -511,4 +516,34 @@ public class JobServiceImpl implements JobService { List<JobInstanceBean> instances = instanceRepo.findByJobId(jobId, pageable); return !CollectionUtils.isEmpty(instances) && LivySessionStates.isHealthy(instances.get(0).getState()); } + + @Override + public String getJobHdfsPersistPath(String jobName, long timestamp) { + List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(jobName, false); + if (jobList.size() == 0) { + return null; + } + if (jobList.get(0).getType().toLowerCase().equals("batch")) { + return getPersistPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + ""; + } + + return getPersistPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + ""; + } + + private String getPersistPath(String jsonString) { + try { + JSONObject obj = new JSONObject(jsonString); + JSONArray persistArray = obj.getJSONArray("persist"); + for (int i = 0; i < persistArray.length(); i++) { + if (persistArray.getJSONObject(i).get("type").equals("hdfs")) { + return persistArray.getJSONObject(i).getJSONObject("config").getString("path"); + } + } + + return null; + } catch (Exception ex) { + LOGGER.error("Fail to get Persist path from {}", jsonString, ex); + return null; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/f0ee5ec9/service/src/main/java/org/apache/griffin/core/util/FSUtil.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java index ed40ee2..f6cfd2c 100644 --- a/service/src/main/java/org/apache/griffin/core/util/FSUtil.java +++ b/service/src/main/java/org/apache/griffin/core/util/FSUtil.java @@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.BufferedReader; import java.io.IOException; import java.io.InputStream; @@ -177,4 +176,23 @@ public class FSUtil { } } + public static String getFirstMissRecordPath(String hdfsDir) throws Exception{ + List<FileStatus> fileList = listFileStatus(hdfsDir); + for(int i=0; i<fileList.size();i++){ + if(fileList.get(i).getPath().toUri().toString().toLowerCase().contains("missrecord")){ + return fileList.get(i).getPath().toUri().toString(); + } + } + return null; + } + + public static InputStream getMissSampleInputStream(String path) throws Exception { + List<String> subDirList = listSubDir(path); + //FIXME: only handle 1-sub dir here now + for(int i=0; i< subDirList.size();i++){ + return getSampleInputStream(getFirstMissRecordPath(subDirList.get(i))); + } + return getSampleInputStream(getFirstMissRecordPath(path)); + } + }
