Repository: incubator-eagle
Updated Branches:
  refs/heads/master c629ca1eb -> 91643e84d


[EAGLE-592] add MessageJsonScheme which extracts audit log from a json 
formatted …

https://issues.apache.org/jira/browse/EAGLE-592

Author: Zhao, Qingwen <qingwz...@ebay.com>

Closes #477 from qingwen220/EAGLE-592.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/91643e84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/91643e84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/91643e84

Branch: refs/heads/master
Commit: 91643e84dab0f21916b47a552c0e02b277ee05f8
Parents: c629ca1
Author: Zhao, Qingwen <qingwz...@ebay.com>
Authored: Sat Oct 8 15:53:36 2016 +0800
Committer: Zhao, Qingwen <qingwz...@ebay.com>
Committed: Sat Oct 8 15:53:36 2016 +0800

----------------------------------------------------------------------
 .../auditlog/kafka/MessageJsonScheme.java       | 68 ++++++++++++++++++++
 1 file changed, 68 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/91643e84/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java
----------------------------------------------------------------------
diff --git 
a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java
 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java
new file mode 100644
index 0000000..9ffcaf9
--- /dev/null
+++ 
b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/kafka/MessageJsonScheme.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.auditlog.kafka;
+
+import backtype.storm.spout.Scheme;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.slf4j.Logger;
+import storm.kafka.StringScheme;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+public class MessageJsonScheme  implements Scheme {
+
+    private static final Logger LOG = 
org.slf4j.LoggerFactory.getLogger(MessageJsonScheme.class);
+    private static final ObjectMapper mapper = new ObjectMapper();
+
+    public static final String MESSAGE_SCHEME_KEY = "message";
+
+    @Override
+    public Fields getOutputFields() {
+        return new Fields(StringScheme.STRING_SCHEME_KEY);
+    }
+
+    @Override
+    @SuppressWarnings("rawtypes")
+    public List<Object> deserialize(byte[] ser) {
+        try {
+            if (ser != null) {
+                Map map = mapper.readValue(ser, Map.class);
+                Object message = map.get(MESSAGE_SCHEME_KEY);
+                if (message != null) {
+                    return new Values(map.get(MESSAGE_SCHEME_KEY));
+                }
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Content is null, ignore");
+                }
+            }
+        } catch (IOException e) {
+            try {
+                LOG.error("Failed to deserialize as JSON: {}", new String(ser, 
"UTF-8"), e);
+            } catch (Exception ex) {
+                LOG.error(ex.getMessage(), ex);
+            }
+        }
+        return null;
+    }
+}
\ No newline at end of file

Reply via email to