Repository: incubator-eagle Updated Branches: refs/heads/master c629ca1eb -> 91643e84d
[EAGLE-592] add MessageJsonScheme which extracts audit log from a json formatted ⦠https://issues.apache.org/jira/browse/EAGLE-592 Author: Zhao, Qingwen <qingwz...@ebay.com> Closes #477 from qingwen220/EAGLE-592. Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/91643e84 Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/91643e84 Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/91643e84 Branch: refs/heads/master Commit: 91643e84dab0f21916b47a552c0e02b277ee05f8 Parents: c629ca1 Author: Zhao, Qingwen <qingwz...@ebay.com> Authored: Sat Oct 8 15:53:36 2016 +0800 Committer: Zhao, Qingwen <qingwz...@ebay.com> Committed: Sat Oct 8 15:53:36 2016 +0800 ---------------------------------------------------------------------- .../auditlog/kafka/MessageJsonScheme.java | 68 ++++++++++++++++++++ 1 file changed, 68 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/91643e84/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java ---------------------------------------------------------------------- diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java new file mode 100644 index 0000000..9ffcaf9 --- /dev/null +++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.eagle.security.auditlog.kafka; + +import backtype.storm.spout.Scheme; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Values; +import org.codehaus.jackson.map.ObjectMapper; +import org.slf4j.Logger; +import storm.kafka.StringScheme; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +public class MessageJsonScheme implements Scheme { + + private static final Logger LOG = org.slf4j.LoggerFactory.getLogger(MessageJsonScheme.class); + private static final ObjectMapper mapper = new ObjectMapper(); + + public static final String MESSAGE_SCHEME_KEY = "message"; + + @Override + public Fields getOutputFields() { + return new Fields(StringScheme.STRING_SCHEME_KEY); + } + + @Override + @SuppressWarnings("rawtypes") + public List<Object> deserialize(byte[] ser) { + try { + if (ser != null) { + Map map = mapper.readValue(ser, Map.class); + Object message = map.get(MESSAGE_SCHEME_KEY); + if (message != null) { + return new Values(map.get(MESSAGE_SCHEME_KEY)); + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Content is null, ignore"); + } + } + } catch (IOException e) { + try { + LOG.error("Failed to deserialize as JSON: {}", new String(ser, "UTF-8"), e); + } catch (Exception ex) { + LOG.error(ex.getMessage(), ex); + } + } + return null; + } +} \ No newline at end of file