Repository: kylin
Updated Branches:
  refs/heads/master 46d4f97c2 -> 50435e114


KYLIN-3145 Support Kafka JSON message whose property name includes _

Signed-off-by: shaofengshi <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5075f590
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5075f590
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5075f590

Branch: refs/heads/master
Commit: 5075f590d843e808186f9d859e0c09ab1877fb0b
Parents: 46d4f97
Author: shaofengshi <[email protected]>
Authored: Sun Feb 4 16:32:22 2018 +0800
Committer: shaofengshi <[email protected]>
Committed: Fri Feb 9 09:53:48 2018 +0800

----------------------------------------------------------------------
 .../apache/kylin/metadata/model/TblColRef.java  |  6 +++
 .../kylin/source/kafka/StreamingParser.java     |  6 ++-
 .../source/kafka/TimedJsonStreamParser.java     | 47 +++++++++++++++-----
 .../source/kafka/TimedJsonStreamParserTest.java | 36 +++++++++++++--
 source-kafka/src/test/resources/message.json    |  2 +-
 5 files changed, 79 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5075f590/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
index b48b0c4..ee33e8a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/TblColRef.java
@@ -96,12 +96,18 @@ public class TblColRef implements Serializable {
 
     // for test mainly
     public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, 
String name, String datatype) {
+        return mockup(table, oneBasedColumnIndex, name, datatype, null);
+    }
+
+    // for test mainly
+    public static TblColRef mockup(TableDesc table, int oneBasedColumnIndex, 
String name, String datatype, String comment) {
         ColumnDesc desc = new ColumnDesc();
         String id = "" + oneBasedColumnIndex;
         desc.setId(id);
         desc.setName(name);
         desc.setDatatype(datatype);
         desc.init(table);
+        desc.setComment(comment);
         return new TblColRef(desc);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/5075f590/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index c2b5104..c97db36 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -42,7 +42,8 @@ public abstract class StreamingParser {
     public static final String PROPERTY_TS_COLUMN_NAME = "tsColName";
     public static final String PROPERTY_TS_PARSER = "tsParser";
     public static final String PROPERTY_TS_PATTERN = "tsPattern";
-    public static final String EMBEDDED_PROPERTY_SEPARATOR = "separator";
+    public static final String PROPERTY_EMBEDDED_SEPARATOR = "separator";
+    public static final String PROPERTY_STRICT_CHECK = "strictCheck"; // 
whether need check each column strictly, default be false (fault tolerant).
 
     public static final Map<String, String> defaultProperties = 
Maps.newHashMap();
     public static final Map<String, Integer> derivedTimeColumns = 
Maps.newHashMap();
@@ -57,7 +58,8 @@ public abstract class StreamingParser {
         defaultProperties.put(PROPERTY_TS_COLUMN_NAME, "timestamp");
         defaultProperties.put(PROPERTY_TS_PARSER, 
"org.apache.kylin.source.kafka.DefaultTimeParser");
         defaultProperties.put(PROPERTY_TS_PATTERN, 
DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS);
-        defaultProperties.put(EMBEDDED_PROPERTY_SEPARATOR, "_");
+        defaultProperties.put(PROPERTY_EMBEDDED_SEPARATOR, "_");
+        defaultProperties.put(PROPERTY_STRICT_CHECK, "false");
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/5075f590/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index de167b4..3618ba6 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -52,8 +52,9 @@ import com.google.common.collect.Lists;
  * By default it will parse the timestamp col value as Unix time. If the 
format isn't Unix time, need specify the time parser
  * with property StreamingParser#PROPERTY_TS_PARSER.
  * <p>
- * It also support embedded JSON format; Use a separator (customized by 
StreamingParser#EMBEDDED_PROPERTY_SEPARATOR) to concat
- * the property names.
+ * It also support embedded JSON format; Use 
TimedJsonStreamParser#EMBEDDED_PROPERTY_SEPARATOR) to separate them and save 
into
+ * the column's "comment" filed. For example: "{ 'user' : { 'first_name': 
'Tom'}}"; The 'first_name' field is expressed as
+ * 'user_first_name' field, and its comment value is 'user|first_name'.
  */
 public final class TimedJsonStreamParser extends StreamingParser {
 
@@ -64,13 +65,17 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
     private String tsColName = null;
     private String tsParser = null;
     private String separator = null;
+    private boolean strictCheck = true;
     private final Map<String, Object> root = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
     private final Map<String, Object> tempMap = new 
TreeMap<>(String.CASE_INSENSITIVE_ORDER);
     private final Map<String, String[]> nameMap = new HashMap<>();
+    public static final String EMBEDDED_PROPERTY_SEPARATOR = "|";
 
     private final JavaType mapType = MapType.construct(HashMap.class, 
SimpleType.construct(String.class), SimpleType.construct(Object.class));
 
     private AbstractTimeParser streamTimeParser;
+    
+    private long vcounter = 0;
 
     public TimedJsonStreamParser(List<TblColRef> allColumns, Map<String, 
String> properties) {
         this.allColumns = allColumns;
@@ -80,7 +85,8 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
 
         tsColName = properties.get(PROPERTY_TS_COLUMN_NAME);
         tsParser = properties.get(PROPERTY_TS_PARSER);
-        separator = properties.get(EMBEDDED_PROPERTY_SEPARATOR);
+        separator = properties.get(PROPERTY_EMBEDDED_SEPARATOR);
+        strictCheck = 
Boolean.parseBoolean(properties.get(PROPERTY_STRICT_CHECK));
 
         if (!StringUtils.isEmpty(tsParser)) {
             try {
@@ -112,7 +118,7 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
             for (TblColRef column : allColumns) {
                 final String columnName = column.getName().toLowerCase();
                 if (populateDerivedTimeColumns(columnName, result, t) == 
false) {
-                    result.add(getValueByKey(columnName, root));
+                    result.add(getValueByKey(column, root));
                 }
             }
 
@@ -131,16 +137,30 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
         return true;
     }
 
-    protected String getValueByKey(String key, Map<String, Object> rootMap) 
throws IOException {
+    public String[] getEmbeddedPropertyNames(TblColRef column) {
+        final String colName = column.getName().toLowerCase();
+        String[] names = nameMap.get(colName);
+        if (names == null) {
+            String comment = column.getColumnDesc().getComment(); // use 
comment to parse the structure
+            if (!StringUtils.isEmpty(comment) && 
comment.contains(EMBEDDED_PROPERTY_SEPARATOR)) {
+                names = comment.toLowerCase().split("\\" + 
EMBEDDED_PROPERTY_SEPARATOR);
+                nameMap.put(colName, names);
+            } else if (colName.contains(separator)) { // deprecated, just be 
compitable for old version
+                names = colName.toLowerCase().split(separator);
+                nameMap.put(colName, names);
+            }
+        }
+
+        return names;
+    }
+
+    protected String getValueByKey(TblColRef column, Map<String, Object> 
rootMap) throws IOException {
+        final String key = column.getName().toLowerCase();
         if (rootMap.containsKey(key)) {
             return objToString(rootMap.get(key));
         }
 
-        String[] names = nameMap.get(key);
-        if (names == null && key.contains(separator)) {
-            names = key.toLowerCase().split(separator);
-            nameMap.put(key, names);
-        }
+        String[] names = getEmbeddedPropertyNames(column);
 
         if (names != null && names.length > 0) {
             tempMap.clear();
@@ -150,8 +170,11 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
                 if (o instanceof Map) {
                     tempMap.clear();
                     tempMap.putAll((Map<String, Object>) o);
-                } else {
-                    throw new IOException("Property '" + names[i] + "' is not 
embedded format");
+                } else if (strictCheck || vcounter++ % 100 == 0) {
+                    final String msg = "Property '" + names[i] + "' value is 
not embedded JSON format. ";
+                    logger.warn(msg);
+                    if (strictCheck)
+                        throw new IOException(msg);
                 }
             }
             Object finalObject = tempMap.get(names[names.length - 1]);

http://git-wip-us.apache.org/repos/asf/kylin/blob/5075f590/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
 
b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
index 8dc840b..b8aa7f3 100644
--- 
a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
+++ 
b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
@@ -43,9 +43,11 @@ import com.fasterxml.jackson.databind.type.SimpleType;
 public class TimedJsonStreamParserTest extends LocalFileMetadataTestCase {
 
     private static String[] userNeedColNames;
+    private static String[] userNeedColNamesComment;
     private static final String jsonFilePath = 
"src/test/resources/message.json";
     private static ObjectMapper mapper;
-    private final JavaType mapType = MapType.construct(HashMap.class, 
SimpleType.construct(String.class), SimpleType.construct(Object.class));
+    private final JavaType mapType = MapType.construct(HashMap.class, 
SimpleType.construct(String.class),
+            SimpleType.construct(Object.class));
 
     @BeforeClass
     public static void setUp() throws Exception {
@@ -75,8 +77,11 @@ public class TimedJsonStreamParserTest extends 
LocalFileMetadataTestCase {
 
     @Test
     public void testEmbeddedValue() throws Exception {
-        userNeedColNames = new String[] { "user_id", "user_description", 
"user_isProtected" };
-        List<TblColRef> allCol = mockupTblColRefList();
+        userNeedColNames = new String[] { "user_id", "user_description", 
"user_isProtected",
+                "user_is_Default_Profile_Image" };
+        userNeedColNamesComment = new String[] { "", "", "",
+                "user" + TimedJsonStreamParser.EMBEDDED_PROPERTY_SEPARATOR + 
"is_Default_Profile_Image" };
+        List<TblColRef> allCol = 
mockupTblColRefListWithComment(userNeedColNamesComment);
         TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
         Object msg = mapper.readValue(new File(jsonFilePath), mapType);
         ByteBuffer buffer = getJsonByteBuffer(msg);
@@ -85,6 +90,21 @@ public class TimedJsonStreamParserTest extends 
LocalFileMetadataTestCase {
         assertEquals("4853763947", result.get(0));
         assertEquals("Noticias", result.get(1));
         assertEquals("false", result.get(2));
+        assertEquals("false", result.get(3));
+    }
+
+    @Test
+    public void testEmbeddedValueFaultTolerant() throws Exception {
+        userNeedColNames = new String[] { "user_id", "nonexisted_description" 
};
+        userNeedColNamesComment = new String[] { "", "" };
+        List<TblColRef> allCol = mockupTblColRefList();
+        TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
+        Object msg = mapper.readValue(new File(jsonFilePath), mapType);
+        ByteBuffer buffer = getJsonByteBuffer(msg);
+        List<StreamingMessageRow> msgList = parser.parse(buffer);
+        List<String> result = msgList.get(0).getData();
+        assertEquals("4853763947", result.get(0));
+        assertEquals(StringUtils.EMPTY, result.get(1));
     }
 
     @Test
@@ -143,4 +163,14 @@ public class TimedJsonStreamParserTest extends 
LocalFileMetadataTestCase {
         }
         return list;
     }
+
+    private static List<TblColRef> mockupTblColRefListWithComment(String[] 
comments) {
+        TableDesc t = TableDesc.mockup("table_a");
+        List<TblColRef> list = new ArrayList<>();
+        for (int i = 0; i < userNeedColNames.length; i++) {
+            TblColRef c = TblColRef.mockup(t, i, userNeedColNames[i], 
"string", comments[i]);
+            list.add(c);
+        }
+        return list;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/5075f590/source-kafka/src/test/resources/message.json
----------------------------------------------------------------------
diff --git a/source-kafka/src/test/resources/message.json 
b/source-kafka/src/test/resources/message.json
index 55f35d3..27183eb 100644
--- a/source-kafka/src/test/resources/message.json
+++ b/source-kafka/src/test/resources/message.json
@@ -36,7 +36,7 @@
   "user": {
     "id": 4853763947,
     "description": "Noticias",
-    "isDefaultProfileImage": false,
+    "is_Default_Profile_Image": false,
     "isProtected": false
   }
 }
\ No newline at end of file

Reply via email to