Repository: kylin Updated Branches: refs/heads/master 46d4f97c2 -> 50435e114
KYLIN-3145 Support Kafka JSON message whose property name includes _ Signed-off-by: shaofengshi <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5075f590 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5075f590 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5075f590 Branch: refs/heads/master Commit: 5075f590d843e808186f9d859e0c09ab1877fb0b Parents: 46d4f97 Author: shaofengshi <[email protected]> Authored: Sun Feb 4 16:32:22 2018 +0800 Committer: shaofengshi <[email protected]> Committed: Fri Feb 9 09:53:48 2018 +0800 ---------------------------------------------------------------------- .../apache/kylin/metadata/model/TblColRef.java | 6 +++ .../kylin/source/kafka/StreamingParser.java | 6 ++- .../source/kafka/TimedJsonStreamParser.java | 47 +++++++++++++++----- .../source/kafka/TimedJsonStreamParserTest.java | 36 +++++++++++++-- source-kafka/src/test/resources/message.json | 2 +- 5 files changed, 79 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5075f590/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java ---------------------------------------------------------------------- diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java index b48b0c4..ee33e8a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java @@ -96,12 +96,18 @@ public class TblColRef implements Serializable { // for test mainly public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype) { + return mockup(table, oneBasedColumnIndex, name, datatype, null); + } + + // for test mainly + public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, String name, String datatype, String comment) { ColumnDesc desc = new ColumnDesc(); String id = "" + oneBasedColumnIndex; desc.setId(id); desc.setName(name); desc.setDatatype(datatype); desc.init(table); + desc.setComment(comment); return new TblColRef(desc); } http://git-wip-us.apache.org/repos/asf/kylin/blob/5075f590/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index c2b5104..c97db36 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -42,7 +42,8 @@ public abstract class StreamingParser { public static final String PROPERTY_TS_COLUMN_NAME = "tsColName"; public static final String PROPERTY_TS_PARSER = "tsParser"; public static final String PROPERTY_TS_PATTERN = "tsPattern"; - public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator"; + public static final String PROPERTY_EMBEDDED_SEPARATOR = "separator"; + public static final String PROPERTY_STRICT_CHECK = "strictCheck"; // whether need check each column strictly, default be false (fault tolerant). public static final Map<String, String> defaultProperties = Maps.newHashMap(); public static final Map<String, Integer> derivedTimeColumns = Maps.newHashMap(); @@ -57,7 +58,8 @@ public abstract class StreamingParser { defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp"); defaultProperties.put(PROPERTY_TS_PARSER, "org.apache.kylin.source.kafka.DefaultTimeParser"); defaultProperties.put(PROPERTY_TS_PATTERN, DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS); - defaultProperties.put(EMBEDDED_PROPERTY_SEPARATOR, "_"); + defaultProperties.put(PROPERTY_EMBEDDED_SEPARATOR, "_"); + defaultProperties.put(PROPERTY_STRICT_CHECK, "false"); } /** http://git-wip-us.apache.org/repos/asf/kylin/blob/5075f590/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index de167b4..3618ba6 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -52,8 +52,9 @@ import com.google.common.collect.Lists; * By default it will parse the timestamp col value as Unix time. If the format isn't Unix time, need specify the time parser * with property StreamingParser#PROPERTY_TS_PARSER. * <p> - * It also support embedded JSON format; Use a separator (customized by StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat - * the property names. + * It also support embedded JSON format; Use TimedJsonStreamParser#EMBEDDED_PROPERTY_SEPARATOR) to separate them and save into + * the column's "comment" filed. For example: "{ 'user' : { 'first_name': 'Tom'}}"; The 'first_name' field is expressed as + * 'user_first_name' field, and its comment value is 'user|first_name'. */ public final class TimedJsonStreamParser extends StreamingParser { @@ -64,13 +65,17 @@ public final class TimedJsonStreamParser extends StreamingParser { private String tsColName = null; private String tsParser = null; private String separator = null; + private boolean strictCheck = true; private final Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); private final Map<String, Object> tempMap = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); private final Map<String, String[]> nameMap = new HashMap<>(); + public static final String EMBEDDED_PROPERTY_SEPARATOR = "|"; private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class)); private AbstractTimeParser streamTimeParser; + + private long vcounter = 0; public TimedJsonStreamParser(List<TblColRef> allColumns, Map<String, String> properties) { this.allColumns = allColumns; @@ -80,7 +85,8 @@ public final class TimedJsonStreamParser extends StreamingParser { tsColName = properties.get(PROPERTY_TS_COLUMN_NAME); tsParser = properties.get(PROPERTY_TS_PARSER); - separator = properties.get(EMBEDDED_PROPERTY_SEPARATOR); + separator = properties.get(PROPERTY_EMBEDDED_SEPARATOR); + strictCheck = Boolean.parseBoolean(properties.get(PROPERTY_STRICT_CHECK)); if (!StringUtils.isEmpty(tsParser)) { try { @@ -112,7 +118,7 @@ public final class TimedJsonStreamParser extends StreamingParser { for (TblColRef column : allColumns) { final String columnName = column.getName().toLowerCase(); if (populateDerivedTimeColumns(columnName, result, t) == false) { - result.add(getValueByKey(columnName, root)); + result.add(getValueByKey(column, root)); } } @@ -131,16 +137,30 @@ public final class TimedJsonStreamParser extends StreamingParser { return true; } - protected String getValueByKey(String key, Map<String, Object> rootMap) throws IOException { + public String[] getEmbeddedPropertyNames(TblColRef column) { + final String colName = column.getName().toLowerCase(); + String[] names = nameMap.get(colName); + if (names == null) { + String comment = column.getColumnDesc().getComment(); // use comment to parse the structure + if (!StringUtils.isEmpty(comment) && comment.contains(EMBEDDED_PROPERTY_SEPARATOR)) { + names = comment.toLowerCase().split("\\" + EMBEDDED_PROPERTY_SEPARATOR); + nameMap.put(colName, names); + } else if (colName.contains(separator)) { // deprecated, just be compitable for old version + names = colName.toLowerCase().split(separator); + nameMap.put(colName, names); + } + } + + return names; + } + + protected String getValueByKey(TblColRef column, Map<String, Object> rootMap) throws IOException { + final String key = column.getName().toLowerCase(); if (rootMap.containsKey(key)) { return objToString(rootMap.get(key)); } - String[] names = nameMap.get(key); - if (names == null && key.contains(separator)) { - names = key.toLowerCase().split(separator); - nameMap.put(key, names); - } + String[] names = getEmbeddedPropertyNames(column); if (names != null && names.length > 0) { tempMap.clear(); @@ -150,8 +170,11 @@ public final class TimedJsonStreamParser extends StreamingParser { if (o instanceof Map) { tempMap.clear(); tempMap.putAll((Map<String, Object>) o); - } else { - throw new IOException("Property '" + names[i] + "' is not embedded format"); + } else if (strictCheck || vcounter++ % 100 == 0) { + final String msg = "Property '" + names[i] + "' value is not embedded JSON format. "; + logger.warn(msg); + if (strictCheck) + throw new IOException(msg); } } Object finalObject = tempMap.get(names[names.length - 1]); http://git-wip-us.apache.org/repos/asf/kylin/blob/5075f590/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java index 8dc840b..b8aa7f3 100644 --- a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java +++ b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java @@ -43,9 +43,11 @@ import com.fasterxml.jackson.databind.type.SimpleType; public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase { private static String[] userNeedColNames; + private static String[] userNeedColNamesComment; private static final String jsonFilePath = "src/test/resources/message.json"; private static ObjectMapper mapper; - private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class)); + private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), + SimpleType.construct(Object.class)); @BeforeClass public static void setUp() throws Exception { @@ -75,8 +77,11 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase { @Test public void testEmbeddedValue() throws Exception { - userNeedColNames = new String[] { "user_id", "user_description", "user_isProtected" }; - List<TblColRef> allCol = mockupTblColRefList(); + userNeedColNames = new String[] { "user_id", "user_description", "user_isProtected", + "user_is_Default_Profile_Image" }; + userNeedColNamesComment = new String[] { "", "", "", + "user" + TimedJsonStreamParser.EMBEDDED_PROPERTY_SEPARATOR + "is_Default_Profile_Image" }; + List<TblColRef> allCol = mockupTblColRefListWithComment(userNeedColNamesComment); TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null); Object msg = mapper.readValue(new File(jsonFilePath), mapType); ByteBuffer buffer = getJsonByteBuffer(msg); @@ -85,6 +90,21 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase { assertEquals("4853763947", result.get(0)); assertEquals("Noticias", result.get(1)); assertEquals("false", result.get(2)); + assertEquals("false", result.get(3)); + } + + @Test + public void testEmbeddedValueFaultTolerant() throws Exception { + userNeedColNames = new String[] { "user_id", "nonexisted_description" }; + userNeedColNamesComment = new String[] { "", "" }; + List<TblColRef> allCol = mockupTblColRefList(); + TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null); + Object msg = mapper.readValue(new File(jsonFilePath), mapType); + ByteBuffer buffer = getJsonByteBuffer(msg); + List<StreamingMessageRow> msgList = parser.parse(buffer); + List<String> result = msgList.get(0).getData(); + assertEquals("4853763947", result.get(0)); + assertEquals(StringUtils.EMPTY, result.get(1)); } @Test @@ -143,4 +163,14 @@ public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase { } return list; } + + private static List<TblColRef> mockupTblColRefListWithComment(String[] comments) { + TableDesc t = TableDesc.mockup("table_a"); + List<TblColRef> list = new ArrayList<>(); + for (int i = 0; i < userNeedColNames.length; i++) { + TblColRef c = TblColRef.mockup(t, i, userNeedColNames[i], "string", comments[i]); + list.add(c); + } + return list; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5075f590/source-kafka/src/test/resources/message.json ---------------------------------------------------------------------- diff --git a/source-kafka/src/test/resources/message.json b/source-kafka/src/test/resources/message.json index 55f35d3..27183eb 100644 --- a/source-kafka/src/test/resources/message.json +++ b/source-kafka/src/test/resources/message.json @@ -36,7 +36,7 @@ "user": { "id": 4853763947, "description": "Noticias", - "isDefaultProfileImage": false, + "is_Default_Profile_Image": false, "isProtected": false } } \ No newline at end of file
