KYLIN-2596 Enable generating multiple streaming messages with one input message 
in streaming parser

* Minor, remove useless imports.

* Enable generating multiple streaming messages with one input message in 
streaming parser

* Make MR input can generate multiple rows of date.

* For multiple rows, outputKV() should be called for each row.

* Try&catch for each row's data processing.


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/edc4d4cc
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/edc4d4cc
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/edc4d4cc

Branch: refs/heads/master
Commit: edc4d4cc558473476b18d48f232635e44640c27a
Parents: 7c0038d
Author: nichunen <zjsy...@sjtu.org>
Authored: Fri May 12 14:55:14 2017 +0800
Committer: hongbin ma <m...@kyligence.io>
Committed: Fri May 12 14:55:14 2017 +0800

----------------------------------------------------------------------
 .../java/org/apache/kylin/job/DeployUtil.java   |   2 +-
 .../kylin/common/util/StreamingMessage.java     |  62 ----------
 .../kylin/common/util/StreamingMessageRow.java  |  62 ++++++++++
 .../org/apache/kylin/engine/mr/IMRInput.java    |   4 +-
 .../mr/steps/FactDistinctColumnsMapper.java     | 113 ++++++++++---------
 .../engine/mr/steps/HiveToBaseCuboidMapper.java |  15 ++-
 .../engine/mr/steps/InMemCuboidMapper.java      |  15 ++-
 .../apache/kylin/source/hive/HiveMRInput.java   |   6 +-
 .../cardinality/ColumnCardinalityMapper.java    |  25 ++--
 .../apache/kylin/source/kafka/KafkaMRInput.java |  16 ++-
 .../kylin/source/kafka/StreamingParser.java     |   8 +-
 .../source/kafka/TimedJsonStreamParser.java     |  11 +-
 .../source/kafka/TimedJsonStreamParserTest.java |  22 ++--
 13 files changed, 194 insertions(+), 167 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java 
b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index fdcd52c..077c056 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -166,7 +166,7 @@ public class DeployUtil {
         TimedJsonStreamParser timedJsonStreamParser = new 
TimedJsonStreamParser(tableColumns, null);
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
-            List<String> rowColumns = 
timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).getData();
+            List<String> rowColumns = 
timedJsonStreamParser.parse(ByteBuffer.wrap(json.getBytes())).get(0).getData();
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java 
b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
deleted file mode 100644
index 981c8a8..0000000
--- 
a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.common.util;
-
-import java.util.List;
-import java.util.Map;
-
-/**
- */
-public class StreamingMessage {
-
-    private final List<String> data;
-
-    private long offset;
-
-    private long timestamp;
-
-    private Map<String, Object> params;
-
-    public StreamingMessage(List<String> data, long offset, long timestamp, 
Map<String, Object> params) {
-        this.data = data;
-        this.offset = offset;
-        this.timestamp = timestamp;
-        this.params = params;
-    }
-
-    public final List<String> getData() {
-        return data;
-    }
-
-    public final long getOffset() {
-        return offset;
-    }
-
-    public void setOffset(long offset) {
-        this.offset = offset;
-    }
-
-    public final long getTimestamp() {
-        return timestamp;
-    }
-
-    public Map<String, Object> getParams() {
-        return params;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessageRow.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessageRow.java
 
b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessageRow.java
new file mode 100644
index 0000000..9b287d4
--- /dev/null
+++ 
b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessageRow.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common.util;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ */
+public class StreamingMessageRow {
+
+    private final List<String> data;
+
+    private long offset;
+
+    private long timestamp;
+
+    private Map<String, Object> params;
+
+    public StreamingMessageRow(List<String> data, long offset, long timestamp, 
Map<String, Object> params) {
+        this.data = data;
+        this.offset = offset;
+        this.timestamp = timestamp;
+        this.params = params;
+    }
+
+    public final List<String> getData() {
+        return data;
+    }
+
+    public final long getOffset() {
+        return offset;
+    }
+
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
+    public final long getTimestamp() {
+        return timestamp;
+    }
+
+    public Map<String, Object> getParams() {
+        return params;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index 10d4879..6b0e557 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -24,6 +24,8 @@ import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
 
+import java.util.Collection;
+
 /**
  * Any ITableSource that wishes to serve as input of MapReduce build engine 
must adapt to this interface.
  */
@@ -50,7 +52,7 @@ public interface IMRInput {
         public void configureJob(Job job);
 
         /** Parse a mapper input object into column values. */
-        public String[] parseMapperInput(Object mapperInput);
+        public Collection<String[]> parseMapperInput(Object mapperInput);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
index d36ae18..713b7f7 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsMapper.java
@@ -44,6 +44,7 @@ import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
 
 
+
 /**
  */
 public class FactDistinctColumnsMapper<KEYIN> extends 
FactDistinctColumnsMapperBase<KEYIN, Object> {
@@ -157,70 +158,72 @@ public class FactDistinctColumnsMapper<KEYIN> extends 
FactDistinctColumnsMapperB
 
     @Override
     public void doMap(KEYIN key, Object record, Context context) throws 
IOException, InterruptedException {
-        String[] row = flatTableInputFormat.parseMapperInput(record);
-
-        
context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
-        for (int i = 0; i < factDictCols.size(); i++) {
-            String fieldValue = row[dictionaryColumnIndex[i]];
-            if (fieldValue == null)
-                continue;
-
-            int reducerIndex;
-            if (uhcIndex[i] == 0) {
-                //for the normal dictionary column
-                reducerIndex = columnIndexToReducerBeginId.get(i);
-            } else {
-                //for the uhc
-                reducerIndex = columnIndexToReducerBeginId.get(i) + 
(fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
-            }
+        Collection<String[]> rowCollection = 
flatTableInputFormat.parseMapperInput(record);
+
+        for (String[] row: rowCollection) {
+            
context.getCounter(RawDataCounter.BYTES).increment(countSizeInBytes(row));
+            for (int i = 0; i < factDictCols.size(); i++) {
+                String fieldValue = row[dictionaryColumnIndex[i]];
+                if (fieldValue == null)
+                    continue;
+
+                int reducerIndex;
+                if (uhcIndex[i] == 0) {
+                    //for the normal dictionary column
+                    reducerIndex = columnIndexToReducerBeginId.get(i);
+                } else {
+                    //for the uhc
+                    reducerIndex = columnIndexToReducerBeginId.get(i) + 
(fieldValue.hashCode() & 0x7fffffff) % uhcReducerCount;
+                }
 
-            tmpbuf.clear();
-            byte[] valueBytes = Bytes.toBytes(fieldValue);
-            int size = valueBytes.length + 1;
-            if (size >= tmpbuf.capacity()) {
-                tmpbuf = ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), 
size));
-            }
-            tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
-            tmpbuf.put(valueBytes);
-            outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-            DataType type = factDictCols.get(i).getType();
-            sortableKey.init(outputKey, type);
-            //judge type
-            context.write(sortableKey, EMPTY_TEXT);
-
-            // log a few rows for troubleshooting
-            if (rowCount < 10) {
-                logger.info("Sample output: " + factDictCols.get(i) + " '" + 
fieldValue + "' => reducer " + reducerIndex);
+                tmpbuf.clear();
+                byte[] valueBytes = Bytes.toBytes(fieldValue);
+                int size = valueBytes.length + 1;
+                if (size >= tmpbuf.capacity()) {
+                    tmpbuf = 
ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+                }
+                tmpbuf.put(Bytes.toBytes(reducerIndex)[3]);
+                tmpbuf.put(valueBytes);
+                outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                DataType type = factDictCols.get(i).getType();
+                sortableKey.init(outputKey, type);
+                //judge type
+                context.write(sortableKey, EMPTY_TEXT);
+
+                // log a few rows for troubleshooting
+                if (rowCount < 10) {
+                    logger.info("Sample output: " + factDictCols.get(i) + " '" 
+ fieldValue + "' => reducer " + reducerIndex);
+                }
             }
-        }
 
-        if (collectStatistics) {
-            if (rowCount % 100 < samplingPercentage) {
-                if (isUsePutRowKeyToHllNewAlgorithm) {
-                    putRowKeyToHLLNew(row);
-                } else {
-                    putRowKeyToHLLOld(row);
+            if (collectStatistics) {
+                if (rowCount % 100 < samplingPercentage) {
+                    if (isUsePutRowKeyToHllNewAlgorithm) {
+                        putRowKeyToHLLNew(row);
+                    } else {
+                        putRowKeyToHLLOld(row);
+                    }
                 }
-            }
 
-            if (needFetchPartitionCol == true) {
-                String fieldValue = row[partitionColumnIndex];
-                if (fieldValue != null) {
-                    tmpbuf.clear();
-                    byte[] valueBytes = Bytes.toBytes(fieldValue);
-                    int size = valueBytes.length + 1;
-                    if (size >= tmpbuf.capacity()) {
-                        tmpbuf = 
ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+                if (needFetchPartitionCol == true) {
+                    String fieldValue = row[partitionColumnIndex];
+                    if (fieldValue != null) {
+                        tmpbuf.clear();
+                        byte[] valueBytes = Bytes.toBytes(fieldValue);
+                        int size = valueBytes.length + 1;
+                        if (size >= tmpbuf.capacity()) {
+                            tmpbuf = 
ByteBuffer.allocate(countNewSize(tmpbuf.capacity(), size));
+                        }
+                        tmpbuf.put(MARK_FOR_PARTITION_COL);
+                        tmpbuf.put(valueBytes);
+                        outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
+                        sortableKey.init(outputKey, (byte) 0);
+                        context.write(sortableKey, EMPTY_TEXT);
                     }
-                    tmpbuf.put(MARK_FOR_PARTITION_COL);
-                    tmpbuf.put(valueBytes);
-                    outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
-                    sortableKey.init(outputKey, (byte) 0);
-                    context.write(sortableKey, EMPTY_TEXT);
                 }
             }
+            rowCount++;
         }
-        rowCount++;
     }
 
     private long countSizeInBytes(String[] row) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
index 428f878..a04fb43 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/HiveToBaseCuboidMapper.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+import java.util.Collection;
 
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.MRUtil;
@@ -38,12 +39,14 @@ public class HiveToBaseCuboidMapper<KEYIN> extends 
BaseCuboidMapperBase<KEYIN, O
 
     @Override
     public void doMap(KEYIN key, Object value, Context context) throws 
IOException, InterruptedException {
-        String[] row = flatTableInputFormat.parseMapperInput(value);
-        try {
-            outputKV(row, context);
-
-        } catch (Exception ex) {
-            handleErrorRecord(row, ex);
+        Collection<String[]> rowCollection = 
flatTableInputFormat.parseMapperInput(value);
+        for (String[] row: rowCollection) {
+            try {
+                outputKV(row, context);
+
+            } catch (Exception ex) {
+                handleErrorRecord(row, ex);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
index c0ff2f2..eee189c 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapper.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -116,12 +117,14 @@ public class InMemCuboidMapper<KEYIN> extends 
KylinMapper<KEYIN, Object, ByteArr
     @Override
     public void doMap(KEYIN key, Object record, Context context) throws 
IOException, InterruptedException {
         // put each row to the queue
-        String[] row = flatTableInputFormat.parseMapperInput(record);
-        List<String> rowAsList = Arrays.asList(row);
-
-        while (!future.isDone()) {
-            if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
-                break;
+        Collection<String[]> rowCollection = 
flatTableInputFormat.parseMapperInput(record);
+
+        for(String[] row: rowCollection) {
+            List<String> rowAsList = Arrays.asList(row);
+            while (!future.isDone()) {
+                if (queue.offer(rowAsList, 1, TimeUnit.SECONDS)) {
+                    break;
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 2f348a0..d7a2c7e 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -19,6 +19,8 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
@@ -118,8 +120,8 @@ public class HiveMRInput implements IMRInput {
         }
 
         @Override
-        public String[] parseMapperInput(Object mapperInput) {
-            return HiveTableReader.getRowAsStringArray((HCatRecord) 
mapperInput);
+        public List<String[]> parseMapperInput(Object mapperInput) {
+            return 
Collections.singletonList(HiveTableReader.getRowAsStringArray((HCatRecord) 
mapperInput));
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
----------------------------------------------------------------------
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
index c712605..9033d67 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/cardinality/ColumnCardinalityMapper.java
@@ -20,6 +20,7 @@ package org.apache.kylin.source.hive.cardinality;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -68,22 +69,24 @@ public class ColumnCardinalityMapper<T> extends 
KylinMapper<T, Object, IntWritab
     @Override
     public void doMap(T key, Object value, Context context) throws 
IOException, InterruptedException {
         ColumnDesc[] columns = tableDesc.getColumns();
-        String[] values = tableInputFormat.parseMapperInput(value);
+        Collection<String[]> valuesCollection = 
tableInputFormat.parseMapperInput(value);
 
-        for (int m = 0; m < columns.length; m++) {
-            String field = columns[m].getName();
-            String fieldValue = values[m];
-            if (fieldValue == null)
-                fieldValue = "NULL";
+        for (String[] values: valuesCollection) {
+            for (int m = 0; m < columns.length; m++) {
+                String field = columns[m].getName();
+                String fieldValue = values[m];
+                if (fieldValue == null)
+                    fieldValue = "NULL";
 
-            if (counter < 5 && m < 10) {
-                System.out.println("Get row " + counter + " column '" + field 
+ "'  value: " + fieldValue);
+                if (counter < 5 && m < 10) {
+                    System.out.println("Get row " + counter + " column '" + 
field + "'  value: " + fieldValue);
+                }
+
+                getHllc(m).add(Bytes.toBytes(fieldValue.toString()));
             }
 
-            getHllc(m).add(Bytes.toBytes(fieldValue.toString()));
+            counter++;
         }
-
-        counter++;
     }
 
     private HLLCounter getHllc(Integer key) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index c7b327f..500e1e9 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -19,7 +19,9 @@ package org.apache.kylin.source.kafka;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
 import javax.annotation.Nullable;
@@ -32,7 +34,7 @@ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessageRow;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
@@ -127,11 +129,17 @@ public class KafkaMRInput implements IMRInput {
         }
 
         @Override
-        public String[] parseMapperInput(Object mapperInput) {
+        public Collection<String[]> parseMapperInput(Object mapperInput) {
             Text text = (Text) mapperInput;
             ByteBuffer buffer = ByteBuffer.wrap(text.getBytes(), 0, 
text.getLength());
-            StreamingMessage streamingMessage = streamingParser.parse(buffer);
-            return streamingMessage.getData().toArray(new 
String[streamingMessage.getData().size()]);
+            List<StreamingMessageRow>  streamingMessageRowList = 
streamingParser.parse(buffer);
+            List<String[]> parsedDataCollection = new ArrayList<>();
+
+            for (StreamingMessageRow row: streamingMessageRowList) {
+                parsedDataCollection.add(row.getData().toArray(new 
String[row.getData().size()]));
+            }
+
+            return parsedDataCollection;
         }
 
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 75f9c4b..2e3c11c 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -26,7 +26,7 @@ import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.DateFormat;
-import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessageRow;
 import org.apache.kylin.common.util.TimeUtil;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -62,11 +62,11 @@ public abstract class StreamingParser {
 
     /**
      * @param message
-     * @return StreamingMessage must not be NULL
+     * @return List<StreamingMessageRow> must not be NULL
      */
-    abstract public StreamingMessage parse(ByteBuffer message);
+    abstract public List<StreamingMessageRow> parse(ByteBuffer message);
 
-    abstract public boolean filter(StreamingMessage streamingMessage);
+    abstract public boolean filter(StreamingMessageRow streamingMessageRow);
 
     public static StreamingParser getStreamingParser(String parserName, String 
parserProperties, List<TblColRef> columns) throws ReflectiveOperationException {
         if (!StringUtils.isEmpty(parserName)) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 6ff0d2f..de167b4 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -32,7 +32,7 @@ import java.util.Arrays;
 import com.fasterxml.jackson.databind.DeserializationFeature;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.ByteBufferBackedInputStream;
-import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessageRow;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -100,7 +100,7 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(ByteBuffer buffer) {
+    public List<StreamingMessageRow> parse(ByteBuffer buffer) {
         try {
             Map<String, Object> message = mapper.readValue(new 
ByteBufferBackedInputStream(buffer), mapType);
             root.clear();
@@ -116,7 +116,10 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
                 }
             }
 
-            return new StreamingMessage(result, 0, t, Collections.<String, 
Object>emptyMap());
+            StreamingMessageRow streamingMessageRow = new 
StreamingMessageRow(result, 0, t, Collections.<String, Object>emptyMap());
+            List<StreamingMessageRow> messageRowList = new 
ArrayList<StreamingMessageRow>();
+            messageRowList.add(streamingMessageRow);
+            return messageRowList;
         } catch (IOException e) {
             logger.error("error", e);
             throw new RuntimeException(e);
@@ -124,7 +127,7 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
     }
 
     @Override
-    public boolean filter(StreamingMessage streamingMessage) {
+    public boolean filter(StreamingMessageRow streamingMessageRow) {
         return true;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/edc4d4cc/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
 
b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
index 230ff00..8dc840b 100644
--- 
a/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
+++ 
b/source-kafka/src/test/java/org/apache/kylin/source/kafka/TimedJsonStreamParserTest.java
@@ -28,7 +28,7 @@ import java.util.List;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.common.util.StreamingMessage;
+import org.apache.kylin.common.util.StreamingMessageRow;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.junit.AfterClass;
@@ -65,8 +65,8 @@ public class TimedJsonStreamParserTest extends 
LocalFileMetadataTestCase {
         TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
         Object msg = mapper.readValue(new File(jsonFilePath), mapType);
         ByteBuffer buffer = getJsonByteBuffer(msg);
-        StreamingMessage sMsg = parser.parse(buffer);
-        List<String> result = sMsg.getData();
+        List<StreamingMessageRow> msgList = parser.parse(buffer);
+        List<String> result = msgList.get(0).getData();
         assertEquals("Jul 20, 2016 9:59:17 AM", result.get(0));
         assertEquals("755703618762862600", result.get(1));
         assertEquals("false", result.get(2));
@@ -80,8 +80,8 @@ public class TimedJsonStreamParserTest extends 
LocalFileMetadataTestCase {
         TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
         Object msg = mapper.readValue(new File(jsonFilePath), mapType);
         ByteBuffer buffer = getJsonByteBuffer(msg);
-        StreamingMessage sMsg = parser.parse(buffer);
-        List<String> result = sMsg.getData();
+        List<StreamingMessageRow> msgList = parser.parse(buffer);
+        List<String> result = msgList.get(0).getData();
         assertEquals("4853763947", result.get(0));
         assertEquals("Noticias", result.get(1));
         assertEquals("false", result.get(2));
@@ -96,8 +96,8 @@ public class TimedJsonStreamParserTest extends 
LocalFileMetadataTestCase {
         HashMap<String, Object> map = (HashMap<String, Object>) msg;
         Object array = map.get("mediaEntities");
         ByteBuffer buffer = getJsonByteBuffer(msg);
-        StreamingMessage sMsg = parser.parse(buffer);
-        List<String> result = sMsg.getData();
+        List<StreamingMessageRow> msgList = parser.parse(buffer);
+        List<String> result = msgList.get(0).getData();
         System.out.println(result);
 
     }
@@ -109,8 +109,8 @@ public class TimedJsonStreamParserTest extends 
LocalFileMetadataTestCase {
         TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
         Object msg = mapper.readValue(new File(jsonFilePath), mapType);
         ByteBuffer buffer = getJsonByteBuffer(msg);
-        StreamingMessage sMsg = parser.parse(buffer);
-        List<String> result = sMsg.getData();
+        List<StreamingMessageRow> msgList = parser.parse(buffer);
+        List<String> result = msgList.get(0).getData();
 
     }
 
@@ -121,8 +121,8 @@ public class TimedJsonStreamParserTest extends 
LocalFileMetadataTestCase {
         TimedJsonStreamParser parser = new TimedJsonStreamParser(allCol, null);
         Object msg = mapper.readValue(new File(jsonFilePath), mapType);
         ByteBuffer buffer = getJsonByteBuffer(msg);
-        StreamingMessage sMsg = parser.parse(buffer);
-        List<String> result = sMsg.getData();
+        List<StreamingMessageRow> msgList = parser.parse(buffer);
+        List<String> result = msgList.get(0).getData();
         assertEquals(StringUtils.EMPTY, result.get(0));
         assertEquals(StringUtils.EMPTY, result.get(1));
     }

Reply via email to