[
https://issues.apache.org/jira/browse/HADOOP-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17581172#comment-17581172
]
ASF GitHub Bot commented on HADOOP-18258:
-----------------------------------------
sravanigadey commented on code in PR #4383:
URL: https://github.com/apache/hadoop/pull/4383#discussion_r948693543
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroDataRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+ public static final int MAX_LINE_LENGTH = 32000;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+ /**
+ * parseAuditLog method helps in parsing the audit log
+ * into key-value pairs using regular expressions.
+ *
+ * @param singleAuditLog this is single audit log from merged audit log file
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
of a single audit log
+ */
+ public HashMap<String, String> parseAuditLog(String singleAuditLog) {
+ HashMap<String, String> auditLogMap = new HashMap<>();
+ if (singleAuditLog == null || singleAuditLog.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return auditLogMap;
+ }
+ final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
+ boolean patternMatching = matcher.matches();
+ if (patternMatching) {
+ for (String key : AWS_LOG_REGEXP_GROUPS) {
+ try {
+ final String value = matcher.group(key);
+ auditLogMap.put(key, value);
+ } catch (IllegalStateException e) {
+ LOG.debug(String.valueOf(e));
+ }
+ }
+ }
+ return auditLogMap;
+ }
+
+ /**
+ * parseReferrerHeader method helps in parsing the http referrer header.
+ * which is one of the key-value pair of audit log
+ *
+ * @param referrerHeader this is the http referrer header of a particular
audit log
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
+ * of audit log as well as referrer header present in it
+ */
+ public HashMap<String, String> parseReferrerHeader(String referrerHeader) {
+ HashMap<String, String> referrerHeaderMap = new HashMap<>();
+ if (referrerHeader == null || referrerHeader.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return referrerHeaderMap;
+ }
+ int indexOfQuestionMark = referrerHeader.indexOf("?");
+ String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1,
+ referrerHeader.length() - 1);
+ int lengthOfReferrer = httpReferrer.length();
+ int start = 0;
+ while (start < lengthOfReferrer) {
+ int equals = httpReferrer.indexOf("=", start);
+ // no match : break
+ if (equals == -1) {
+ break;
+ }
+ String key = httpReferrer.substring(start, equals);
+ int end = httpReferrer.indexOf("&", equals);
+ // or end of string
+ if (end == -1) {
+ end = lengthOfReferrer;
+ }
+ String value = httpReferrer.substring(equals + 1, end);
+ referrerHeaderMap.put(key, value);
+ start = end + 1;
+ }
+ return referrerHeaderMap;
+ }
+
+ /**
+ * Merge and parse all the audit log files and convert data into avro file.
+ *
+ * @param fileSystem filesystem
+ * @param logsPath source path of logs
+ * @param destPath destination path of merged log file
+ * @return true
+ * @throws IOException on any failure
+ */
+ public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
+ Path logsPath,
+ Path destPath) throws IOException {
+
+ // Listing file in given path
+ RemoteIterator<LocatedFileStatus> listOfLogFiles =
+ fileSystem.listFiles(logsPath, true);
+
+ Path destFile = new Path(destPath, "AuditLogFile");
+
+ try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) {
+
+ // Iterating over the list of files to merge and parse
+ while (listOfLogFiles.hasNext()) {
+ FileStatus fileStatus = listOfLogFiles.next();
+ int fileLength = (int) fileStatus.getLen();
+ byte[] byteBuffer = new byte[fileLength];
+ try (FSDataInputStream fsDataInputStream =
+ fileSystem.open(fileStatus.getPath())) {
+
+ // Instantiating generated AvroDataRecord class
+ AvroDataRecord avroDataRecord = new AvroDataRecord();
+
+ // Instantiate DatumWriter class
+ DatumWriter<AvroDataRecord> datumWriter =
+ new SpecificDatumWriter<AvroDataRecord>(AvroDataRecord.class);
+ DataFileWriter<AvroDataRecord> dataFileWriter =
+ new DataFileWriter<AvroDataRecord>(datumWriter);
+
+ List<String> longValues =
+ Arrays.asList("turnaroundtime", "bytessent", "objectsize",
Review Comment:
used S3LogParser constants
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroDataRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+ public static final int MAX_LINE_LENGTH = 32000;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+ /**
+ * parseAuditLog method helps in parsing the audit log
+ * into key-value pairs using regular expressions.
+ *
+ * @param singleAuditLog this is single audit log from merged audit log file
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
of a single audit log
+ */
+ public HashMap<String, String> parseAuditLog(String singleAuditLog) {
+ HashMap<String, String> auditLogMap = new HashMap<>();
+ if (singleAuditLog == null || singleAuditLog.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return auditLogMap;
+ }
+ final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
+ boolean patternMatching = matcher.matches();
+ if (patternMatching) {
+ for (String key : AWS_LOG_REGEXP_GROUPS) {
+ try {
+ final String value = matcher.group(key);
+ auditLogMap.put(key, value);
+ } catch (IllegalStateException e) {
+ LOG.debug(String.valueOf(e));
+ }
+ }
+ }
+ return auditLogMap;
+ }
+
+ /**
+ * parseReferrerHeader method helps in parsing the http referrer header.
+ * which is one of the key-value pair of audit log
+ *
+ * @param referrerHeader this is the http referrer header of a particular
audit log
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
+ * of audit log as well as referrer header present in it
+ */
+ public HashMap<String, String> parseReferrerHeader(String referrerHeader) {
+ HashMap<String, String> referrerHeaderMap = new HashMap<>();
+ if (referrerHeader == null || referrerHeader.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return referrerHeaderMap;
+ }
+ int indexOfQuestionMark = referrerHeader.indexOf("?");
+ String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1,
+ referrerHeader.length() - 1);
+ int lengthOfReferrer = httpReferrer.length();
+ int start = 0;
+ while (start < lengthOfReferrer) {
+ int equals = httpReferrer.indexOf("=", start);
+ // no match : break
+ if (equals == -1) {
+ break;
+ }
+ String key = httpReferrer.substring(start, equals);
+ int end = httpReferrer.indexOf("&", equals);
+ // or end of string
+ if (end == -1) {
+ end = lengthOfReferrer;
+ }
+ String value = httpReferrer.substring(equals + 1, end);
+ referrerHeaderMap.put(key, value);
+ start = end + 1;
+ }
+ return referrerHeaderMap;
+ }
+
+ /**
+ * Merge and parse all the audit log files and convert data into avro file.
+ *
+ * @param fileSystem filesystem
+ * @param logsPath source path of logs
+ * @param destPath destination path of merged log file
+ * @return true
+ * @throws IOException on any failure
+ */
+ public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
+ Path logsPath,
+ Path destPath) throws IOException {
+
+ // Listing file in given path
+ RemoteIterator<LocatedFileStatus> listOfLogFiles =
+ fileSystem.listFiles(logsPath, true);
+
+ Path destFile = new Path(destPath, "AuditLogFile");
+
+ try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) {
+
+ // Iterating over the list of files to merge and parse
+ while (listOfLogFiles.hasNext()) {
+ FileStatus fileStatus = listOfLogFiles.next();
+ int fileLength = (int) fileStatus.getLen();
+ byte[] byteBuffer = new byte[fileLength];
+ try (FSDataInputStream fsDataInputStream =
+ fileSystem.open(fileStatus.getPath())) {
+
+ // Instantiating generated AvroDataRecord class
+ AvroDataRecord avroDataRecord = new AvroDataRecord();
+
+ // Instantiate DatumWriter class
+ DatumWriter<AvroDataRecord> datumWriter =
+ new SpecificDatumWriter<AvroDataRecord>(AvroDataRecord.class);
+ DataFileWriter<AvroDataRecord> dataFileWriter =
+ new DataFileWriter<AvroDataRecord>(datumWriter);
+
+ List<String> longValues =
+ Arrays.asList("turnaroundtime", "bytessent", "objectsize",
+ "totaltime");
+
+ // Write avro data into a file in bucket destination path
+ Path avroFile = new Path(destPath, "AvroData.avro");
+
+ // Reading the file data using LineRecordReader
+ LineRecordReader lineRecordReader =
+ new LineRecordReader(fsDataInputStream, 0L, fileLength,
+ MAX_LINE_LENGTH);
+ LongWritable k = new LongWritable();
+ Text singleAuditLog = new Text();
+
+ try (FSDataOutputStream fsDataOutputStreamAvro = fileSystem.create(
+ avroFile)) {
+ // adding schema, output stream to DataFileWriter
+ dataFileWriter.create(AvroDataRecord.getClassSchema(),
+ fsDataOutputStreamAvro);
+
+ // Parse each and every audit log from list of logs
+ while (lineRecordReader.next(k, singleAuditLog)) {
+ // Parse audit log except referrer header
+ HashMap<String, String> auditLogMap =
+ parseAuditLog(singleAuditLog.toString());
+
+ String referrerHeader = auditLogMap.get("referrer");
+ if (referrerHeader == null || referrerHeader.equals("-")) {
+ LOG.debug("Log didn't parsed : {}", referrerHeader);
Review Comment:
modified
> Merging of S3A Audit Logs
> -------------------------
>
> Key: HADOOP-18258
> URL: https://issues.apache.org/jira/browse/HADOOP-18258
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Reporter: Sravani Gadey
> Assignee: Sravani Gadey
> Priority: Major
> Labels: pull-request-available
> Time Spent: 12.5h
> Remaining Estimate: 0h
>
> Merging audit log files containing huge number of audit logs collected from a
> job like Hive or Spark job containing various S3 requests like list, head,
> get and put requests.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]