Repository: incubator-griffin Updated Branches: refs/heads/master 34f06afee -> 8e9b50ff9
change partition format and fix details map ignore bug Author: ahutsunshine <[email protected]> Closes #181 from ahutsunshine/master. Project: http://git-wip-us.apache.org/repos/asf/incubator-griffin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-griffin/commit/8e9b50ff Tree: http://git-wip-us.apache.org/repos/asf/incubator-griffin/tree/8e9b50ff Diff: http://git-wip-us.apache.org/repos/asf/incubator-griffin/diff/8e9b50ff Branch: refs/heads/master Commit: 8e9b50ff9b335f8e7e0b58ac3ac031c23b6c2f3e Parents: 34f06af Author: ahutsunshine <[email protected]> Authored: Fri Nov 24 11:17:14 2017 +0800 Committer: Lionel Liu <[email protected]> Committed: Fri Nov 24 11:17:14 2017 +0800 ---------------------------------------------------------------------- .../org/apache/griffin/core/job/SparkSubmitJob.java | 9 +++++---- .../griffin/core/measure/entity/DataConnector.java | 16 ++++++---------- .../apache/griffin/core/measure/entity/Rule.java | 7 ++++++- 3 files changed, 17 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8e9b50ff/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java index d620cac..a1e1e9d 100644 --- a/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java +++ b/service/src/main/java/org/apache/griffin/core/job/SparkSubmitJob.java @@ -141,7 +141,7 @@ public class SparkSubmitJob implements Job { measure.setName(jd.getJobDataMap().getString("jobName")); } - private void setAllDataConnectorPartitions(List<DataSource> sources, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) { + private void setAllDataConnectorPartitions(List<DataSource> sources, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) throws IOException { if (sources == null) { return; } @@ -150,7 +150,7 @@ public class SparkSubmitJob implements Job { } } - private void setDataSourcePartitions(DataSource dataSource, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) { + private void setDataSourcePartitions(DataSource dataSource, String[] patternItemSet, String[] partitionItems, String sourceName, long timestamp) throws IOException { String name = dataSource.getName(); for (DataConnector dataConnector : dataSource.getConnectors()) { if (sourceName.equals(name)) { @@ -159,16 +159,17 @@ public class SparkSubmitJob implements Job { } } - private void setDataConnectorPartitions(DataConnector dc, String[] patternItemSet, String[] partitionItems, long timestamp) { + private void setDataConnectorPartitions(DataConnector dc, String[] patternItemSet, String[] partitionItems, long timestamp) throws IOException { Map<String, String> partitionItemMap = genPartitionMap(patternItemSet, partitionItems, timestamp); /** * partitions must be a string like: "dt=20170301, hour=12" * partitionItemMap.toString() is like "{dt=20170301, hour=12}" */ String partitions = partitionItemMap.toString().substring(1, partitionItemMap.toString().length() - 1); + partitions = partitions.replaceAll(",", " AND "); Map<String, String> configMap = dc.getConfigInMaps(); //config should not be null - configMap.put("partitions", partitions); + configMap.put("where", partitions); try { dc.setConfig(configMap); } catch (JsonProcessingException e) { http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8e9b50ff/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java index a5b80f9..a36c240 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/DataConnector.java @@ -22,6 +22,7 @@ package org.apache.griffin.core.measure.entity; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.commons.lang.StringUtils; import org.apache.griffin.core.util.JsonUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,24 +48,19 @@ public class DataConnector extends AbstractAuditableEntity { @Transient private Map<String, String> configInMaps; - public Map<String, String> getConfigInMaps() { - TypeReference<Map<String, String>> mapType = new TypeReference<Map<String, String>>() { - }; - if (this.configInMaps == null) { - try { - this.configInMaps = JsonUtil.toEntity(config, mapType); - } catch (IOException e) { - LOGGER.error("Error in converting json to map. {}", e.getMessage()); - } + public Map<String, String> getConfigInMaps() throws IOException { + if (this.configInMaps == null && !StringUtils.isEmpty(config)) { + this.configInMaps = JsonUtil.toEntity(config, new TypeReference<Map<String, String>>() {}); } return configInMaps; } public void setConfig(Map<String, String> configInMaps) throws JsonProcessingException { + this.configInMaps = configInMaps; this.config = JsonUtil.toJson(configInMaps); } - public Map<String, String> getConfig() { + public Map<String, String> getConfig() throws IOException { return getConfigInMaps(); } http://git-wip-us.apache.org/repos/asf/incubator-griffin/blob/8e9b50ff/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java ---------------------------------------------------------------------- diff --git a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java index 99bd514..b060bc4 100644 --- a/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java +++ b/service/src/main/java/org/apache/griffin/core/measure/entity/Rule.java @@ -22,6 +22,8 @@ package org.apache.griffin.core.measure.entity; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonInclude; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.commons.lang.StringUtils; import org.apache.griffin.core.util.JsonUtil; import javax.persistence.Column; @@ -89,7 +91,10 @@ public class Rule extends AbstractAuditableEntity { } @JsonProperty("details") - public Map<String, Object> getDetailsMap() { + public Map<String, Object> getDetailsMap() throws IOException { + if (detailsMap == null && !StringUtils.isEmpty(details)) { + detailsMap = JsonUtil.toEntity(details, new TypeReference<Map<String, Object>>() {}); + } return detailsMap; }
