KYLIN-2054 TimedJsonStreamParser should support other time format
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e5007261 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e5007261 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e5007261 Branch: refs/heads/master-cdh5.7 Commit: e500726184b318da2d1f503cd1b159cfe7242347 Parents: 01d5670 Author: shaofengshi <shaofeng...@apache.org> Authored: Tue Sep 27 22:21:15 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Sep 28 17:00:46 2016 +0800 ---------------------------------------------------------------------- build/bin/kylin.sh | 3 +- .../org/apache/kylin/common/util/BasicTest.java | 13 +++ source-kafka/pom.xml | 6 +- .../kylin/source/kafka/AbstractTimeParser.java | 34 ++++++++ .../kylin/source/kafka/DateTimeParser.java | 84 ++++++++++++++++++++ .../kylin/source/kafka/DefaultTimeParser.java | 49 ++++++++++++ .../apache/kylin/source/kafka/KafkaMRInput.java | 2 +- .../kylin/source/kafka/SeekOffsetStep.java | 2 +- .../source/kafka/TimedJsonStreamParser.java | 46 ++++++++--- 9 files changed, 222 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/build/bin/kylin.sh ---------------------------------------------------------------------- diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh index 7a9d2a1..e767492 100644 --- a/build/bin/kylin.sh +++ b/build/bin/kylin.sh @@ -31,7 +31,7 @@ function retrieveDependency() { #retrive $hive_dependency and $hbase_dependency source ${dir}/find-hive-dependency.sh source ${dir}/find-hbase-dependency.sh - source ${dir}/find-kafka-dependency.sh + #source ${dir}/find-kafka-dependency.sh #retrive $KYLIN_EXTRA_START_OPTS if [ -f "${dir}/setenv.sh" ] @@ -40,6 +40,7 @@ function retrieveDependency() { export HBASE_CLASSPATH_PREFIX=${KYLIN_HOME}/conf:${KYLIN_HOME}/lib/*:${KYLIN_HOME}/tool/*:${KYLIN_HOME}/ext/*:${HBASE_CLASSPATH_PREFIX} export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${hive_dependency} + #export HBASE_CLASSPATH=${HBASE_CLASSPATH}:${hive_dependency}:${kafka_dependency} } # start command http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java index ee15832..5eaa011 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/BasicTest.java @@ -24,6 +24,7 @@ import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; +import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.concurrent.ExecutionException; @@ -33,6 +34,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang3.time.FastDateFormat; import org.junit.Ignore; import org.junit.Test; import org.slf4j.LoggerFactory; @@ -206,6 +208,17 @@ public class BasicTest { } } + @Test + @Ignore("for dev only") + public void test3() throws Exception { + FastDateFormat formatter = org.apache.kylin.common.util.DateFormat.getDateFormat("MMM dd, yyyy hh:mm:ss aa"); + System.out.println(formatter.format(new Date())); + + String timeStr = "Jul 20, 2016 9:59:17 AM"; + + System.out.println(formatter.parse(timeStr).getTime()); + } + private static String time(long t) { DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); Calendar cal = Calendar.getInstance(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/pom.xml ---------------------------------------------------------------------- diff --git a/source-kafka/pom.xml b/source-kafka/pom.xml index 212f4c6..f91ab8f 100644 --- a/source-kafka/pom.xml +++ b/source-kafka/pom.xml @@ -48,7 +48,11 @@ <artifactId>kafka_2.10</artifactId> <scope>provided</scope> </dependency> - + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-lang3</artifactId> + <version>${commons-lang3.version}</version> + </dependency> <!-- Env & Test --> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java new file mode 100644 index 0000000..96a4ece --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/AbstractTimeParser.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.kafka; + +/** + */ +public abstract class AbstractTimeParser { + + public AbstractTimeParser(String[] properties) { + } + + /** + * Parse a string time to a long value (epoch time) + * @param time + * @return + */ + abstract public long parseTime(String time) throws IllegalArgumentException; +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java new file mode 100644 index 0000000..2bd699d --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DateTimeParser.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.kafka; + +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.time.FastDateFormat; +import org.apache.kylin.common.util.DateFormat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.text.ParseException; + +/** + */ +public class DateTimeParser extends AbstractTimeParser { + + private static final Logger logger = LoggerFactory.getLogger(DateTimeParser.class); + private String tsPattern = DateFormat.DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS; + + private FastDateFormat formatter = null; + + //call by reflection + public DateTimeParser(String[] properties) { + super(properties); + for (String prop : properties) { + try { + String[] parts = prop.split("="); + if (parts.length == 2) { + switch (parts[0]) { + case "tsPattern": + this.tsPattern = parts[1]; + break; + default: + break; + } + } + } catch (Exception e) { + logger.error("Failed to parse property " + prop); + //ignore + } + } + + if (!StringUtils.isEmpty(tsPattern)) { + try { + formatter = org.apache.kylin.common.util.DateFormat.getDateFormat(tsPattern); + } catch (Throwable e) { + throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'."); + } + } else { + throw new IllegalStateException("Invalid tsPattern: '" + tsPattern + "'."); + } + } + + /** + * Parse a string time to a long value (epoch time) + * + * @param timeStr + * @return + */ + public long parseTime(String timeStr) throws IllegalArgumentException { + + try { + return formatter.parse(timeStr).getTime(); + } catch (ParseException e) { + throw new IllegalArgumentException("Invalid value : pattern: '" + tsPattern + "', value: '" + timeStr + "'" , e); + } + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java new file mode 100644 index 0000000..85f2bfa --- /dev/null +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/DefaultTimeParser.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.source.kafka; + +import org.apache.commons.lang3.StringUtils; + +/** + */ +public class DefaultTimeParser extends AbstractTimeParser { + + public DefaultTimeParser(String[] properties) { + super(properties); + } + + /** + * Parse a string time to a long value (epoch time) + * @param time + * @return + */ + public long parseTime(String time) throws IllegalArgumentException { + long t; + if (StringUtils.isEmpty(time)) { + t = 0; + } else { + try { + t = Long.valueOf(time); + } catch (NumberFormatException e) { + throw new IllegalArgumentException(e); + } + } + return t; + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java index 729719a..6358ee1 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java @@ -114,7 +114,7 @@ public class KafkaMRInput implements IMRInput { try { streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); } catch (ReflectiveOperationException e) { - throw new IllegalArgumentException(); + throw new IllegalArgumentException(e); } } Text text = (Text) mapperInput; http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java index 9369e6f..e1282d6 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/SeekOffsetStep.java @@ -120,7 +120,7 @@ public class SeekOffsetStep extends AbstractExecutable { } catch (IOException e) { return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); } - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset); + return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed, offset start: " + totalStartOffset + ", offset end: " + totalEndOffset + ", message count: " + (totalEndOffset - totalStartOffset)); } else { CubeUpdate cubeBuilder = new CubeUpdate(cube); cubeBuilder.setToRemoveSegs(segment); http://git-wip-us.apache.org/repos/asf/kylin/blob/e5007261/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index 148ae25..2125c05 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -19,6 +19,7 @@ package org.apache.kylin.source.kafka; import java.io.IOException; +import java.lang.reflect.Constructor; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -27,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.TreeMap; +import com.fasterxml.jackson.databind.DeserializationFeature; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.TblColRef; @@ -47,14 +49,18 @@ public final class TimedJsonStreamParser extends StreamingParser { private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class); private List<TblColRef> allColumns; - private final ObjectMapper mapper = new ObjectMapper(); + private final ObjectMapper mapper; private String tsColName = "timestamp"; - private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(String.class)); + private String tsParser = "org.apache.kylin.source.kafka.DefaultTimeParser"; + private final JavaType mapType = MapType.construct(HashMap.class, SimpleType.construct(String.class), SimpleType.construct(Object.class)); + + private AbstractTimeParser streamTimeParser; public TimedJsonStreamParser(List<TblColRef> allColumns, String propertiesStr) { this.allColumns = allColumns; + String[] properties = null; if (!StringUtils.isEmpty(propertiesStr)) { - String[] properties = propertiesStr.split(";"); + properties = propertiesStr.split(";"); for (String prop : properties) { try { String[] parts = prop.split("="); @@ -63,6 +69,9 @@ public final class TimedJsonStreamParser extends StreamingParser { case "tsColName": this.tsColName = parts[1]; break; + case "tsParser": + this.tsParser = parts[1]; + break; default: break; } @@ -75,28 +84,39 @@ public final class TimedJsonStreamParser extends StreamingParser { } logger.info("TimedJsonStreamParser with tsColName {}", tsColName); + + if (!StringUtils.isEmpty(tsParser)) { + try { + Class clazz = Class.forName(tsParser); + Constructor constructor = clazz.getConstructor(String[].class); + streamTimeParser = (AbstractTimeParser) constructor.newInstance((Object)properties); + } catch (Exception e) { + throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + ".", e); + } + } else { + throw new IllegalStateException("Invalid StreamingConfig, tsParser " + tsParser + ", parserProperties " + propertiesStr + "."); + } + mapper = new ObjectMapper(); + mapper.disable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + mapper.disable(DeserializationFeature.FAIL_ON_INVALID_SUBTYPE); + mapper.enable(DeserializationFeature.USE_JAVA_ARRAY_FOR_JSON_ARRAY); } @Override public StreamingMessage parse(ByteBuffer buffer) { try { - Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType); - Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER); + Map<String, Object> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType); + Map<String, Object> root = new TreeMap<>(String.CASE_INSENSITIVE_ORDER); root.putAll(message); - String tsStr = root.get(tsColName); - long t; - if (StringUtils.isEmpty(tsStr)) { - t = 0; - } else { - t = Long.valueOf(tsStr); - } + String tsStr = String.valueOf(root.get(tsColName)); + long t = streamTimeParser.parseTime(tsStr); ArrayList<String> result = Lists.newArrayList(); for (TblColRef column : allColumns) { String columnName = column.getName().toLowerCase(); if (populateDerivedTimeColumns(columnName, result, t) == false) { - String x = root.get(columnName); + String x = String.valueOf(root.get(columnName)); result.add(x); } }