Repository: incubator-griffin Updated Branches: refs/heads/master 23ff999cd -> 7b749ad72
update measure field to support new format and ut 1.update env_batch.json and env_streaming.json 2.Rule - add "inDataFrameName" and "outDataFrameName", remove "name" - add "out" param array, move "metric", "record" param inside "out" array DataSource - add boolean field "baseline" - change "cache" to "checkpoint" DataConnector - add "dataFrameName" Measure - add "sinks" string array - update dqType from String to enum JobServiceImpl - change "persist" to "sinks" - compare literal string "hdfs" case insensitively 3.update measure ut and fix predicate ut bug Author: ahutsunshine <ahutsunsh...@gmail.com> Closes #389 from ahutsunshine/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/7b749ad7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/7b749ad7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/7b749ad7 Branch: refs/heads/master Commit: 7b749ad72a78eb4244bcf47a80a52f2a6e3f222f Parents: 23ff999 Author: ahutsunshine <ahutsunsh...@gmail.com> Authored: Fri Aug 10 18:28:48 2018 +0800 Committer: William Guo <gu...@apache.org> Committed: Fri Aug 10 18:28:48 2018 +0800 ---------------------------------------------------------------------- .../griffin/core/job/BatchJobOperatorImpl.java | 2 +- .../apache/griffin/core/job/JobController.java | 2 +- .../apache/griffin/core/job/JobInstance.java | 6 +- .../org/apache/griffin/core/job/JobService.java | 2 +- .../apache/griffin/core/job/JobServiceImpl.java | 13 ++- .../griffin/core/job/entity/JobDataSegment.java | 14 +-- .../core/measure/entity/DataConnector.java | 19 ++- .../griffin/core/measure/entity/DataSource.java | 50 +++++--- .../core/measure/entity/GriffinMeasure.java | 5 +- .../griffin/core/measure/entity/Measure.java | 65 +++++++++-- .../griffin/core/measure/entity/Rule.java | 115 +++++++++---------- .../measure/entity/StreamingPreProcess.java | 23 +++- service/src/main/resources/env/env_batch.json | 50 +++----- .../src/main/resources/env/env_streaming.json | 24 ++-- .../core/job/FileExistPredicatorTest.java | 4 + .../core/measure/repo/MeasureRepoTest.java | 19 ++- .../apache/griffin/core/util/EntityHelper.java | 42 ++++--- 17 files changed, 275 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java index 41f5142..bc73cd8 100644 --- a/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/BatchJobOperatorImpl.java @@ -315,7 +315,7 @@ public class BatchJobOperatorImpl implements JobOperator { private boolean isValidBaseLine(List<JobDataSegment> segments) { assert segments != null; for (JobDataSegment jds : segments) { - if (jds.isBaseline()) { + if (jds.isAsTsBaseline()) { return true; } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobController.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobController.java b/service/src/main/java/org/apache/griffin/core/job/JobController.java index ebfdd91..64b8e42 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobController.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobController.java @@ -93,7 +93,7 @@ public class JobController { @RequestMapping(path = "/jobs/download", method = RequestMethod.GET) public ResponseEntity<Resource> download(@RequestParam("jobName") String jobName, @RequestParam("ts") long timestamp) throws Exception { - String path = jobService.getJobHdfsPersistPath(jobName, timestamp); + String path = jobService.getJobHdfsSinksPath(jobName, timestamp); InputStreamResource resource = new InputStreamResource(FSUtil.getMissSampleInputStream(path)); return ResponseEntity.ok(). header("content-disposition", "attachment; filename = sampleMissingData.json") http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobInstance.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java index 18a5a96..ab67c81 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobInstance.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobInstance.java @@ -140,7 +140,7 @@ public class JobInstance implements Job { private void setSourcesPartitionsAndPredicates(List<DataSource> sources) throws Exception { boolean isFirstBaseline = true; for (JobDataSegment jds : job.getSegments()) { - if (jds.isBaseline() && isFirstBaseline) { + if (jds.isAsTsBaseline() && isFirstBaseline) { Long tsOffset = TimeUtil.str2Long(jds.getSegmentRange().getBegin()); measure.setTimestamp(jobStartTime + tsOffset); isFirstBaseline = false; @@ -347,7 +347,7 @@ public class JobInstance implements Job { private void preProcessMeasure() throws IOException { for (DataSource source : measure.getDataSources()) { - Map cacheMap = source.getCacheMap(); + Map cacheMap = source.getCheckpointMap(); //to skip batch job if (cacheMap == null) { return; @@ -357,7 +357,7 @@ public class JobInstance implements Job { cache = cache.replaceAll("\\$\\{SOURCE_NAME}", source.getName()); cache = cache.replaceAll("\\$\\{TARGET_NAME}", source.getName()); cacheMap = toEntity(cache, Map.class); - source.setCacheMap(cacheMap); + source.setCheckpointMap(cacheMap); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobService.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobService.java b/service/src/main/java/org/apache/griffin/core/job/JobService.java index 65766e2..58e541f 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobService.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobService.java @@ -44,5 +44,5 @@ public interface JobService { JobHealth getHealthInfo(); - String getJobHdfsPersistPath(String jobName, long timestamp); + String getJobHdfsSinksPath(String jobName, long timestamp); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java index b041c0b..5617065 100644 --- a/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java +++ b/service/src/main/java/org/apache/griffin/core/job/JobServiceImpl.java @@ -545,24 +545,25 @@ public class JobServiceImpl implements JobService { } @Override - public String getJobHdfsPersistPath(String jobName, long timestamp) { + public String getJobHdfsSinksPath(String jobName, long timestamp) { List<AbstractJob> jobList = jobRepo.findByJobNameAndDeleted(jobName, false); if (jobList.size() == 0) { return null; } if (jobList.get(0).getType().toLowerCase().equals("batch")) { - return getPersistPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + ""; + return getSinksPath(ENV_BATCH) + "/" + jobName + "/" + timestamp + ""; } - return getPersistPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + ""; + return getSinksPath(ENV_STREAMING) + "/" + jobName + "/" + timestamp + ""; } - private String getPersistPath(String jsonString) { + private String getSinksPath(String jsonString) { try { JSONObject obj = new JSONObject(jsonString); - JSONArray persistArray = obj.getJSONArray("persist"); + JSONArray persistArray = obj.getJSONArray("sinks"); for (int i = 0; i < persistArray.length(); i++) { - if (persistArray.getJSONObject(i).get("type").equals("hdfs")) { + Object type = persistArray.getJSONObject(i).get("type"); + if (type instanceof String && "hdfs".equalsIgnoreCase(String.valueOf(type))) { return persistArray.getJSONObject(i).getJSONObject("config").getString("path"); } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java index 62abe3e..a990abe 100644 --- a/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java +++ b/service/src/main/java/org/apache/griffin/core/job/entity/JobDataSegment.java @@ -43,19 +43,19 @@ public class JobDataSegment extends AbstractAuditableEntity { @NotNull private String dataConnectorName; - private boolean baseline = false; + private boolean asTsBaseline = false; @OneToOne(fetch = FetchType.EAGER, cascade = {CascadeType.PERSIST, CascadeType.REMOVE, CascadeType.MERGE}) @JoinColumn(name = "segment_range_id") private SegmentRange segmentRange = new SegmentRange(); @JsonProperty("as.baseline") - public boolean isBaseline() { - return baseline; + public boolean isAsTsBaseline() { + return asTsBaseline; } - public void setBaseline(boolean baseline) { - this.baseline = baseline; + public void setAsTsBaseline(boolean asTsBaseline) { + this.asTsBaseline = asTsBaseline; } @JsonProperty("segment.range") @@ -85,12 +85,12 @@ public class JobDataSegment extends AbstractAuditableEntity { public JobDataSegment(String dataConnectorName, boolean baseline) { this.dataConnectorName = dataConnectorName; - this.baseline = baseline; + this.asTsBaseline = baseline; } public JobDataSegment(String dataConnectorName, boolean baseline, SegmentRange segmentRange) { this.dataConnectorName = dataConnectorName; - this.baseline = baseline; + this.asTsBaseline = baseline; this.segmentRange = segmentRange; } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java index 2ec3ed7..2a63b57 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java @@ -73,6 +73,9 @@ public class DataConnector extends AbstractAuditableEntity { private String version; @JsonInclude(JsonInclude.Include.NON_NULL) + private String dataFrameName; + + @JsonInclude(JsonInclude.Include.NON_NULL) private String dataUnit; @JsonInclude(JsonInclude.Include.NON_NULL) @@ -131,6 +134,15 @@ public class DataConnector extends AbstractAuditableEntity { return config; } + @JsonProperty("dataframe.name") + public String getDataFrameName() { + return dataFrameName; + } + + public void setDataFrameName(String dataFrameName) { + this.dataFrameName = dataFrameName; + } + @JsonProperty("data.unit") public String getDataUnit() { return dataUnit; @@ -204,16 +216,19 @@ public class DataConnector extends AbstractAuditableEntity { public DataConnector() { } - public DataConnector(String name, DataType type, String version, String config) throws IOException { + public DataConnector(String name, DataType type, String version, + String config, String dataFrameName) throws IOException { this.name = name; this.type = type; this.version = version; this.config = config; this.configMap = JsonUtil.toEntity(config, new TypeReference<Map<String, Object>>() { }); + this.dataFrameName = dataFrameName; } - public DataConnector(String name, String dataUnit, Map configMap, List<SegmentPredicate> predicates) { + public DataConnector(String name, String dataUnit, Map configMap, + List<SegmentPredicate> predicates) { this.name = name; this.dataUnit = dataUnit; this.configMap = configMap; http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java index 59878cc..79525b2 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataSource.java @@ -55,13 +55,15 @@ public class DataSource extends AbstractAuditableEntity { @JoinColumn(name = "data_source_id") private List<DataConnector> connectors = new ArrayList<>(); + private boolean baseline = false; + @JsonIgnore @Column(length = 1024) - private String cache; + private String checkpoint; @Transient @JsonInclude(JsonInclude.Include.NON_NULL) - private Map<String, Object> cacheMap; + private Map<String, Object> checkpointMap; public String getName() { @@ -83,36 +85,44 @@ public class DataSource extends AbstractAuditableEntity { this.connectors = connectors; } - private String getCache() { - return cache; + public boolean isBaseline() { + return baseline; + } + + public void setBaseline(boolean baseline) { + this.baseline = baseline; + } + + private String getCheckpoint() { + return checkpoint; } - private void setCache(String cache) { - this.cache = cache; + private void setCheckpoint(String checkpoint) { + this.checkpoint = checkpoint; } - @JsonProperty("cache") - public Map<String, Object> getCacheMap() { - return cacheMap; + @JsonProperty("checkpoint") + public Map<String, Object> getCheckpointMap() { + return checkpointMap; } - public void setCacheMap(Map<String, Object> cacheMap) { - this.cacheMap = cacheMap; + public void setCheckpointMap(Map<String, Object> checkpointMap) { + this.checkpointMap = checkpointMap; } @PrePersist @PreUpdate public void save() throws JsonProcessingException { - if (cacheMap != null) { - this.cache = JsonUtil.toJson(cacheMap); + if (checkpointMap != null) { + this.checkpoint = JsonUtil.toJson(checkpointMap); } } @PostLoad public void load() throws IOException { - if (!StringUtils.isEmpty(cache)) { - this.cacheMap = JsonUtil.toEntity(cache, new TypeReference<Map<String, Object>>() { + if (!StringUtils.isEmpty(checkpoint)) { + this.checkpointMap = JsonUtil.toEntity(checkpoint, new TypeReference<Map<String, Object>>() { }); } } @@ -124,4 +134,14 @@ public class DataSource extends AbstractAuditableEntity { this.name = name; this.connectors = connectors; } + + public DataSource(String name, boolean baseline, + Map<String, Object> checkpointMap, + List<DataConnector> connectors) { + this.name = name; + this.baseline = baseline; + this.checkpointMap = checkpointMap; + this.connectors = connectors; + + } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java index 6dd3172..e4a1e41 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/GriffinMeasure.java @@ -156,11 +156,14 @@ public class GriffinMeasure extends Measure { super(); } - public GriffinMeasure(String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) { + public GriffinMeasure(String name, String owner, List<DataSource> dataSources, + EvaluateRule evaluateRule, + List<String> sinksList) { this.name = name; this.owner = owner; this.dataSources = dataSources; this.evaluateRule = evaluateRule; + setSinksList(sinksList); } public GriffinMeasure(Long measureId, String name, String owner, List<DataSource> dataSources, EvaluateRule evaluateRule) { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java index fcecc1d..a831ea3 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Measure.java @@ -19,17 +19,18 @@ under the License. package org.apache.griffin.core.measure.entity; -import com.fasterxml.jackson.annotation.JsonInclude; -import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.annotation.JsonSubTypes; -import com.fasterxml.jackson.annotation.JsonTypeInfo; - -import javax.persistence.Entity; -import javax.persistence.EnumType; -import javax.persistence.Enumerated; -import javax.persistence.Inheritance; -import javax.persistence.InheritanceType; +import com.fasterxml.jackson.annotation.*; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.commons.lang.StringUtils; +import org.apache.griffin.core.util.JsonUtil; + +import javax.persistence.*; import javax.validation.constraints.NotNull; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; @Entity @Inheritance(strategy = InheritanceType.JOINED) @@ -53,6 +54,13 @@ public abstract class Measure extends AbstractAuditableEntity { @JsonInclude(JsonInclude.Include.NON_NULL) private String organization; + @Transient + @JsonInclude(JsonInclude.Include.NON_NULL) + private List<String> sinksList = Arrays.asList("ELASTICSEARCH", "HDFS"); + + @JsonIgnore + private String sinks; + private boolean deleted = false; public String getName() { @@ -96,6 +104,23 @@ public abstract class Measure extends AbstractAuditableEntity { this.owner = owner; } + @JsonProperty("sinks") + public List<String> getSinksList() { + return sinksList; + } + + public void setSinksList(List<String> sinksList) { + this.sinksList = sinksList; + } + + private String getSinks() { + return sinks; + } + + private void setSinks(String sinks) { + this.sinks = sinks; + } + public boolean isDeleted() { return deleted; } @@ -104,6 +129,26 @@ public abstract class Measure extends AbstractAuditableEntity { this.deleted = deleted; } + @PrePersist + @PreUpdate + public void save() throws JsonProcessingException { + if (sinksList != null) { + this.sinks = JsonUtil.toJson(sinksList); + } else { + this.sinks = null; + } + } + + @PostLoad + public void load() throws IOException { + if (!StringUtils.isEmpty(sinks)) { + this.sinksList = JsonUtil.toEntity(sinks, new TypeReference<List<String>>() { + }); + } else { + this.sinksList = null; + } + } + public Measure() { } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java index 033af2e..10203e2 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java @@ -26,13 +26,9 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import java.io.IOException; +import java.util.List; import java.util.Map; -import javax.persistence.Column; -import javax.persistence.Entity; -import javax.persistence.PostLoad; -import javax.persistence.PrePersist; -import javax.persistence.PreUpdate; -import javax.persistence.Transient; +import javax.persistence.*; import javax.validation.constraints.NotNull; import org.apache.commons.lang.StringUtils; @@ -49,14 +45,18 @@ public class Rule extends AbstractAuditableEntity { @NotNull private String dslType; - @NotNull - private String dqType; + @Enumerated(EnumType.STRING) + private DqType dqType; @Column(length = 8 * 1024) @NotNull private String rule; - private String name; + @JsonInclude(JsonInclude.Include.NON_NULL) + private String inDataFrameName; + + @JsonInclude(JsonInclude.Include.NON_NULL) + private String outDataFrameName; @JsonIgnore // @Access(AccessType.PROPERTY) @@ -67,21 +67,12 @@ public class Rule extends AbstractAuditableEntity { @JsonInclude(JsonInclude.Include.NON_NULL) private Map<String, Object> detailsMap; - @JsonIgnore -// @Access(AccessType.PROPERTY) - private String metric; - @Transient @JsonInclude(JsonInclude.Include.NON_NULL) - private Map<String, Object> metricMap; + private List<Map<String, Object>> outList; @JsonIgnore -// @Access(AccessType.PROPERTY) - private String record; - - @Transient - @JsonInclude(JsonInclude.Include.NON_NULL) - private Map<String, Object> recordMap; + private String out; @JsonProperty("dsl.type") public String getDslType() { @@ -93,11 +84,11 @@ public class Rule extends AbstractAuditableEntity { } @JsonProperty("dq.type") - public String getDqType() { + public DqType getDqType() { return dqType; } - public void setDqType(String dqType) { + public void setDqType(DqType dqType) { this.dqType = dqType; } @@ -109,31 +100,31 @@ public class Rule extends AbstractAuditableEntity { this.rule = rule; } - @JsonProperty("details") - public Map<String, Object> getDetailsMap() { - return detailsMap; + @JsonProperty("in.dataframe.name") + public String getInDataFrameName() { + return inDataFrameName; } - public void setDetailsMap(Map<String, Object> detailsMap) { - this.detailsMap = detailsMap; + public void setInDataFrameName(String inDataFrameName) { + this.inDataFrameName = inDataFrameName; } - @JsonProperty("metric") - public Map<String, Object> getMetricMap() { - return metricMap; + @JsonProperty("out.dataframe.name") + public String getOutDataFrameName() { + return outDataFrameName; } - public void setMetricMap(Map<String, Object> metricMap) { - this.metricMap = metricMap; + public void setOutDataFrameName(String outDataFrameName) { + this.outDataFrameName = outDataFrameName; } - @JsonProperty("record") - public Map<String, Object> getRecordMap() { - return recordMap; + @JsonProperty("details") + public Map<String, Object> getDetailsMap() { + return detailsMap; } - public void setRecordMap(Map<String, Object> recordMap) { - this.recordMap = recordMap; + public void setDetailsMap(Map<String, Object> detailsMap) { + this.detailsMap = detailsMap; } private String getDetails() { @@ -144,24 +135,21 @@ public class Rule extends AbstractAuditableEntity { this.details = details; } - private void setMetric(String metric) { - this.metric = metric; - } - - private String getRecord() { - return record; + @JsonProperty("out") + public List<Map<String, Object>> getOutList() { + return outList; } - private void setRecord(String record) { - this.record = record; + public void setOutList(List<Map<String, Object>> outList) { + this.outList = outList; } - public String getName() { - return name; + private String getOut() { + return out; } - public void setName(String name) { - this.name = name; + private void setOut(String out) { + this.out = out; } @PrePersist @@ -170,13 +158,9 @@ public class Rule extends AbstractAuditableEntity { if (detailsMap != null) { this.details = JsonUtil.toJson(detailsMap); } - if (metricMap != null) { - this.metric = JsonUtil.toJson(metricMap); + if (outList != null) { + this.out = JsonUtil.toJson(outList); } - if (recordMap != null) { - this.record = JsonUtil.toJson(recordMap); - } - } @PostLoad @@ -185,12 +169,8 @@ public class Rule extends AbstractAuditableEntity { this.detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() { }); } - if (!StringUtils.isEmpty(metric)) { - this.metricMap = JsonUtil.toEntity(metric, new TypeReference<Map<String, Object>>() { - }); - } - if (!StringUtils.isEmpty(record)) { - this.recordMap = JsonUtil.toEntity(record, new TypeReference<Map<String, Object>>() { + if (!StringUtils.isEmpty(out)) { + this.outList = JsonUtil.toEntity(out, new TypeReference<List<Map<String, Object>>>() { }); } } @@ -198,10 +178,21 @@ public class Rule extends AbstractAuditableEntity { public Rule() { } - public Rule(String dslType, String dqType, String rule, Map<String, Object> detailsMap) throws JsonProcessingException { + public Rule(String dslType, DqType dqType, String rule, Map<String, Object> detailsMap) throws JsonProcessingException { this.dslType = dslType; this.dqType = dqType; this.rule = rule; + this.detailsMap = detailsMap; this.details = JsonUtil.toJson(detailsMap); } + + public Rule(String dslType, DqType dqType, String rule, + String inDataFrameName, String outDataFrameName, + Map<String, Object> detailsMap, + List<Map<String, Object>> outList) throws JsonProcessingException { + this(dslType, dqType, rule, detailsMap); + this.inDataFrameName = inDataFrameName; + this.outDataFrameName = outDataFrameName; + this.outList = outList; + } } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java b/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java index 9902c14..4be8e6c 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/StreamingPreProcess.java @@ -43,7 +43,9 @@ public class StreamingPreProcess extends AbstractAuditableEntity { private String dslType; - private String name; + private String inDataFrameName; + + private String outDataFrameName; private String rule; @@ -64,14 +66,25 @@ public class StreamingPreProcess extends AbstractAuditableEntity { this.dslType = dslType; } - public String getName() { - return name; + @JsonProperty("in.dataframe.name") + public String getInDataFrameName() { + return inDataFrameName; + } + + public void setInDataFrameName(String inDataFrameName) { + this.inDataFrameName = inDataFrameName; } - public void setName(String name) { - this.name = name; + @JsonProperty("out.dataframe.name") + public String getOutDataFrameName() { + return outDataFrameName; } + public void setOutDataFrameName(String outDataFrameName) { + this.outDataFrameName = outDataFrameName; + } + + public String getRule() { return rule; } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/resources/env/env_batch.json ---------------------------------------------------------------------- diff --git a/service/src/main/resources/env/env_batch.json b/service/src/main/resources/env/env_batch.json index 1e3160f..72a3839 100644 --- a/service/src/main/resources/env/env_batch.json +++ b/service/src/main/resources/env/env_batch.json @@ -1,55 +1,31 @@ { "spark": { - "log.level": "WARN", - "checkpoint.dir": "hdfs:///griffin/checkpoint/${JOB_NAME}", - "batch.interval": "20s", - "process.interval": "1m", - "config": { - "spark.default.parallelism": 4, - "spark.task.maxFailures": 5, - "spark.streaming.kafkaMaxRatePerPartition": 1000, - "spark.streaming.concurrentJobs": 4, - "spark.yarn.maxAppAttempts": 5, - "spark.yarn.am.attemptFailuresValidityInterval": "1h", - "spark.yarn.max.executor.failures": 120, - "spark.yarn.executor.failuresValidityInterval": "1h", - "spark.hadoop.fs.hdfs.impl.disable.cache": true - } + "log.level": "WARN" }, - "persist": [ + "sinks": [ { - "type": "log", + "type": "CONSOLE", "config": { - "max.log.lines": 2 + "max.log.lines": 10 } }, { - "type": "hdfs", + "type": "HDFS", "config": { - "path": "hdfs:///griffin/persist" + "path": "hdfs:///griffin/persist", + "max.persist.lines": 10000, + "max.lines.per.file": 10000 } }, { - "type": "http", + "type": "ELASTICSEARCH", "config": { "method": "post", - "api": "http://es:9200/griffin/accuracy" - } - } - ], - "info.cache": [ - { - "type": "zk", - "config": { - "hosts": "zk:2181", - "namespace": "griffin/infocache", - "lock.path": "lock", - "mode": "persist", - "init.clear": false, - "close.clear": false + "api": "http://es:9200/griffin/accuracy", + "connection.timeout": "1m", + "retry": 10 } } ], - "cleaner": { - } + "griffin.checkpoint": [] } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/main/resources/env/env_streaming.json ---------------------------------------------------------------------- diff --git a/service/src/main/resources/env/env_streaming.json b/service/src/main/resources/env/env_streaming.json index 5c4b190..6508af1 100644 --- a/service/src/main/resources/env/env_streaming.json +++ b/service/src/main/resources/env/env_streaming.json @@ -3,8 +3,8 @@ "log.level": "WARN", "checkpoint.dir": "hdfs:///griffin/checkpoint/${JOB_NAME}", "init.clear": true, - "batch.interval": "30s", - "process.interval": "3m", + "batch.interval": "1m", + "process.interval": "5m", "config": { "spark.default.parallelism": 4, "spark.task.maxFailures": 5, @@ -17,28 +17,30 @@ "spark.hadoop.fs.hdfs.impl.disable.cache": true } }, - "persist": [ + "sinks": [ { - "type": "log", + "type": "CONSOLE", "config": { - "max.log.lines": 2 + "max.log.lines": 100 } }, { - "type": "hdfs", + "type": "HDFS", "config": { - "path": "hdfs:///griffin/persist" + "path": "hdfs:///griffin/persist", + "max.persist.lines": 10000, + "max.lines.per.file": 10000 } }, { - "type": "http", + "type": "ELASTICSEARCH", "config": { "method": "post", "api": "http://es:9200/griffin/accuracy" } } ], - "info.cache": [ + "griffin.checkpoint": [ { "type": "zk", "config": { @@ -50,7 +52,5 @@ "close.clear": false } } - ], - "cleaner": { - } + ] } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java b/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java index 0d144be..af981e3 100644 --- a/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java +++ b/service/src/test/java/org/apache/griffin/core/job/FileExistPredicatorTest.java @@ -21,7 +21,11 @@ public class FileExistPredicatorTest { @BeforeClass public static void mkFile() throws IOException { + File fileDirectory = new File(rootPath); // to fix createFileExclusively exception File file = new File(rootPath + fileName); + if (!fileDirectory.exists()) { + fileDirectory.mkdir(); + } if (!file.exists()) { file.createNewFile(); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java index aa80608..ef37f97 100644 --- a/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java +++ b/service/src/test/java/org/apache/griffin/core/measure/repo/MeasureRepoTest.java @@ -21,11 +21,12 @@ package org.apache.griffin.core.measure.repo; import static org.apache.griffin.core.util.EntityHelper.createGriffinMeasure; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; import java.util.List; import org.apache.griffin.core.config.EclipseLinkJpaConfigForTest; -import org.apache.griffin.core.measure.entity.Measure; +import org.apache.griffin.core.measure.entity.*; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -57,7 +58,21 @@ public class MeasureRepoTest { public void testFindByNameAndDeleted() { String name = "m1"; List<Measure> measures = measureRepo.findByNameAndDeleted(name, false); - assertThat(measures.get(0).getName()).isEqualTo(name); + GriffinMeasure m = (GriffinMeasure) measures.get(0); + + List<DataSource> sources = m.getDataSources(); + DataConnector connector = sources.get(0).getConnectors().get(0); + Rule rule = m.getEvaluateRule().getRules().get(0); + assertEquals(m.getSinksList().size(), 2); + assertEquals(sources.get(0).isBaseline(), true); + assertEquals(sources.get(0).getCheckpointMap().size(), 1); + assertEquals(connector.getDataFrameName(), "kafka"); + assertEquals(connector.getConfigMap().size(), 3); + assertEquals(rule.getDqType(), DqType.ACCURACY); + assertEquals(rule.getInDataFrameName(), "in"); + assertEquals(rule.getOutDataFrameName(), "out"); + assertEquals(rule.getDetailsMap().size(), 1); + assertEquals(rule.getOutList().size(), 2); } @Test http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/7b749ad7/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java ---------------------------------------------------------------------- diff --git a/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java b/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java index e2254d1..4c28ea7 100644 --- a/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java +++ b/service/src/test/java/org/apache/griffin/core/util/EntityHelper.java @@ -43,12 +43,7 @@ import org.apache.griffin.core.job.entity.LivySessionStates; import org.apache.griffin.core.job.entity.SegmentPredicate; import org.apache.griffin.core.job.entity.SegmentRange; import org.apache.griffin.core.job.entity.VirtualJob; -import org.apache.griffin.core.measure.entity.DataConnector; -import org.apache.griffin.core.measure.entity.DataSource; -import org.apache.griffin.core.measure.entity.EvaluateRule; -import org.apache.griffin.core.measure.entity.ExternalMeasure; -import org.apache.griffin.core.measure.entity.GriffinMeasure; -import org.apache.griffin.core.measure.entity.Rule; +import org.apache.griffin.core.measure.entity.*; import org.quartz.JobDataMap; import org.quartz.JobKey; import org.quartz.SimpleTrigger; @@ -81,17 +76,34 @@ public class EntityHelper { public static GriffinMeasure createGriffinMeasure(String name, DataConnector dcSource, DataConnector dcTarget) throws Exception { - DataSource dataSource = new DataSource("source", Arrays.asList(dcSource)); - DataSource targetSource = new DataSource("target", Arrays.asList(dcTarget)); + DataSource dataSource = new DataSource("source", true, createCheckpointMap(), Arrays.asList(dcSource)); + DataSource targetSource = new DataSource("target", false, createCheckpointMap(), Arrays.asList(dcTarget)); List<DataSource> dataSources = new ArrayList<>(); dataSources.add(dataSource); dataSources.add(targetSource); - String rules = "source.id=target.id AND source.name=target.name AND source.age=target.age"; - Map<String, Object> map = new HashMap<>(); - map.put("detail", "detail info"); - Rule rule = new Rule("griffin-dsl", "ACCURACY", rules, map); + Rule rule = createRule(); EvaluateRule evaluateRule = new EvaluateRule(Arrays.asList(rule)); - return new GriffinMeasure(name, "test", dataSources, evaluateRule); + return new GriffinMeasure(name, "test", dataSources, evaluateRule, Arrays.asList("ELASTICSEARCH", "HDFS")); + } + + private static Rule createRule() throws JsonProcessingException { + Map<String, Object> map = new HashMap<>(); + map.put("detail", "detail"); + String rule = "source.id=target.id AND source.name=target.name AND source.age=target.age"; + Map<String, Object> metricMap = new HashMap<>(); + Map<String, Object> recordMap = new HashMap<>(); + metricMap.put("type", "metric"); + metricMap.put("name", "accu"); + recordMap.put("type", "record"); + recordMap.put("name", "missRecords"); + List<Map<String, Object>> outList = Arrays.asList(metricMap, recordMap); + return new Rule("griffin-dsl", DqType.ACCURACY, rule, "in", "out", map, outList); + } + + private static Map<String, Object> createCheckpointMap() { + Map<String, Object> map = new HashMap<>(); + map.put("info.path", "source"); + return map; } public static DataConnector createDataConnector(String name, @@ -102,14 +114,14 @@ public class EntityHelper { config.put("database", database); config.put("table.name", table); config.put("where", where); - return new DataConnector(name, "1h", config, null); + return new DataConnector(name, DataConnector.DataType.HIVE, "1.2", JsonUtil.toJson(config), "kafka"); } public static DataConnector createDataConnector(String name, String database, String table, String where, - SegmentPredicate predicate) throws IOException { + SegmentPredicate predicate) { HashMap<String, String> config = new HashMap<>(); config.put("database", database); config.put("table.name", table);