[
https://issues.apache.org/jira/browse/HADOOP-18258?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17576936#comment-17576936
]
ASF GitHub Bot commented on HADOOP-18258:
-----------------------------------------
steveloughran commented on code in PR #4383:
URL: https://github.com/apache/hadoop/pull/4383#discussion_r940550698
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroDataRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+ public static final int MAX_LINE_LENGTH = 32000;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+ /**
+ * parseAuditLog method helps in parsing the audit log
+ * into key-value pairs using regular expressions.
+ *
+ * @param singleAuditLog this is single audit log from merged audit log file
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
of a single audit log
+ */
+ public HashMap<String, String> parseAuditLog(String singleAuditLog) {
+ HashMap<String, String> auditLogMap = new HashMap<>();
+ if (singleAuditLog == null || singleAuditLog.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return auditLogMap;
+ }
+ final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
+ boolean patternMatching = matcher.matches();
+ if (patternMatching) {
+ for (String key : AWS_LOG_REGEXP_GROUPS) {
+ try {
+ final String value = matcher.group(key);
+ auditLogMap.put(key, value);
+ } catch (IllegalStateException e) {
+ LOG.debug(String.valueOf(e));
+ }
+ }
+ }
+ return auditLogMap;
+ }
+
+ /**
+ * parseReferrerHeader method helps in parsing the http referrer header.
+ * which is one of the key-value pair of audit log
+ *
+ * @param referrerHeader this is the http referrer header of a particular
audit log
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
+ * of audit log as well as referrer header present in it
+ */
+ public HashMap<String, String> parseReferrerHeader(String referrerHeader) {
+ HashMap<String, String> referrerHeaderMap = new HashMap<>();
+ if (referrerHeader == null || referrerHeader.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return referrerHeaderMap;
+ }
+ int indexOfQuestionMark = referrerHeader.indexOf("?");
+ String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1,
+ referrerHeader.length() - 1);
+ int lengthOfReferrer = httpReferrer.length();
+ int start = 0;
+ while (start < lengthOfReferrer) {
+ int equals = httpReferrer.indexOf("=", start);
+ // no match : break
+ if (equals == -1) {
+ break;
+ }
+ String key = httpReferrer.substring(start, equals);
+ int end = httpReferrer.indexOf("&", equals);
+ // or end of string
+ if (end == -1) {
+ end = lengthOfReferrer;
+ }
+ String value = httpReferrer.substring(equals + 1, end);
+ referrerHeaderMap.put(key, value);
+ start = end + 1;
+ }
+ return referrerHeaderMap;
+ }
+
+ /**
+ * Merge and parse all the audit log files and convert data into avro file.
+ *
+ * @param fileSystem filesystem
+ * @param logsPath source path of logs
+ * @param destPath destination path of merged log file
+ * @return true
+ * @throws IOException on any failure
+ */
+ public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
Review Comment:
for testing it would be good to return the number of lines parsed
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/AuditTool.java:
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Arrays;
+import java.util.List;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.audit.mapreduce.S3AAuditLogMergerAndParser;
+import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import static
org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_COMMAND_ARGUMENT_ERROR;
+import static org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_FAIL;
+import static
org.apache.hadoop.service.launcher.LauncherExitCodes.EXIT_SUCCESS;
+
+/**
+ * AuditTool is a Command Line Interface.
+ * Its functionality is to parse the audit log files
+ * and generate avro file.
+ */
+public class AuditTool extends Configured implements Tool, Closeable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(AuditTool.class);
+
+ private final S3AAuditLogMergerAndParser s3AAuditLogMergerAndParser =
+ new S3AAuditLogMergerAndParser();
+
+ /**
+ * Name of this tool: {@value}.
+ */
+ public static final String AUDITTOOL =
+ "org.apache.hadoop.fs.s3a.audit.AuditTool";
+
+ /**
+ * Purpose of this tool: {@value}.
+ */
+ public static final String PURPOSE =
+ "\n\nUSAGE:\nMerge, parse audit log files and convert into avro file "
+ + "for "
+ + "better "
+ + "visualization";
+
+ // Exit codes
+ private static final int SUCCESS = EXIT_SUCCESS;
+ private static final int FAILURE = EXIT_FAIL;
+ private static final int INVALID_ARGUMENT = EXIT_COMMAND_ARGUMENT_ERROR;
+
+ private static final String USAGE =
+ "bin/hadoop " + "Class" + " DestinationPath" + " SourcePath" + "\n" +
+ "bin/hadoop " + AUDITTOOL + " s3a://BUCKET" + " s3a://BUCKET" + "\n";
+
+ private PrintWriter out;
+
+ public AuditTool() {
+ super();
+ }
+
+ /**
+ * Tells us the usage of the AuditTool by commands.
+ *
+ * @return the string USAGE
+ */
+ public String getUsage() {
+ return USAGE + PURPOSE;
+ }
+
+ public String getName() {
+ return AUDITTOOL;
+ }
+
+ /**
+ * This run method in AuditTool takes source and destination path of bucket,
+ * and check if there are directories and pass these paths to merge and
+ * parse audit log files.
+ *
+ * @param args argument list
+ * @return SUCCESS i.e, '0', which is an exit code
+ * @throws Exception on any failure.
+ */
+ @Override
+ public int run(String[] args) throws Exception {
+ List<String> paths = Arrays.asList(args);
+ if (paths.isEmpty()) {
+ errorln(getUsage());
+ throw invalidArgs("No bucket specified");
+ }
+
+ // Path of audit log files
+ Path logsPath = new Path(paths.get(1));
Review Comment:
if the lengths of the paths != 2, you should do something similar to the
lines above, but with an error like "too many arguments"
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditTool.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.nio.file.Files;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+
+import static
org.apache.hadoop.fs.s3a.audit.TestS3AAuditLogMergerAndParser.SAMPLE_LOG_ENTRY;
+
+/**
+ * This will implement tests on AuditTool class.
+ */
+public class TestAuditTool extends AbstractS3ATestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestAuditTool.class);
+
+ private final AuditTool auditTool = new AuditTool();
+
+ /**
+ * Sample directories and files to test.
+ */
+ private File sampleFile;
+ private File sampleDir;
+ private File sampleDestDir;
+
+ /**
+ * Testing run method in AuditTool class by passing source and destination
+ * paths.
+ */
+ @Test
+ public void testRun() throws Exception {
+ sampleDir = Files.createTempDirectory("sampleDir").toFile();
+ sampleFile = File.createTempFile("sampleFile", ".txt", sampleDir);
+ try (FileWriter fw = new FileWriter(sampleFile)) {
+ fw.write(SAMPLE_LOG_ENTRY);
+ fw.flush();
+ }
+ sampleDestDir = Files.createTempDirectory("sampleDestDir").toFile();
+ Path logsPath = new Path(sampleDir.toURI());
+ Path destPath = new Path(sampleDestDir.toURI());
+ String[] args = {destPath.toString(), logsPath.toString()};
+ auditTool.run(args);
+ FileSystem fileSystem = destPath.getFileSystem(getConfiguration());
+ RemoteIterator<LocatedFileStatus> listOfDestFiles =
+ fileSystem.listFiles(destPath, true);
+ while (listOfDestFiles.hasNext()) {
Review Comment:
this test would pass even if no file called AvroData.avro wasn't created.
you didn't notice it because your production code worked, but if anyone broke
that code, yetus wouldn't detect the failure
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroDataRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+ public static final int MAX_LINE_LENGTH = 32000;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+ /**
+ * parseAuditLog method helps in parsing the audit log
+ * into key-value pairs using regular expressions.
+ *
+ * @param singleAuditLog this is single audit log from merged audit log file
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
of a single audit log
+ */
+ public HashMap<String, String> parseAuditLog(String singleAuditLog) {
+ HashMap<String, String> auditLogMap = new HashMap<>();
+ if (singleAuditLog == null || singleAuditLog.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return auditLogMap;
+ }
+ final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
+ boolean patternMatching = matcher.matches();
+ if (patternMatching) {
+ for (String key : AWS_LOG_REGEXP_GROUPS) {
+ try {
+ final String value = matcher.group(key);
+ auditLogMap.put(key, value);
+ } catch (IllegalStateException e) {
+ LOG.debug(String.valueOf(e));
+ }
+ }
+ }
+ return auditLogMap;
+ }
+
+ /**
+ * parseReferrerHeader method helps in parsing the http referrer header.
+ * which is one of the key-value pair of audit log
+ *
+ * @param referrerHeader this is the http referrer header of a particular
audit log
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
+ * of audit log as well as referrer header present in it
+ */
+ public HashMap<String, String> parseReferrerHeader(String referrerHeader) {
+ HashMap<String, String> referrerHeaderMap = new HashMap<>();
+ if (referrerHeader == null || referrerHeader.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return referrerHeaderMap;
+ }
+ int indexOfQuestionMark = referrerHeader.indexOf("?");
+ String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1,
+ referrerHeader.length() - 1);
+ int lengthOfReferrer = httpReferrer.length();
+ int start = 0;
+ while (start < lengthOfReferrer) {
+ int equals = httpReferrer.indexOf("=", start);
+ // no match : break
+ if (equals == -1) {
+ break;
+ }
+ String key = httpReferrer.substring(start, equals);
+ int end = httpReferrer.indexOf("&", equals);
+ // or end of string
+ if (end == -1) {
+ end = lengthOfReferrer;
+ }
+ String value = httpReferrer.substring(equals + 1, end);
+ referrerHeaderMap.put(key, value);
+ start = end + 1;
+ }
+ return referrerHeaderMap;
+ }
+
+ /**
+ * Merge and parse all the audit log files and convert data into avro file.
+ *
+ * @param fileSystem filesystem
+ * @param logsPath source path of logs
+ * @param destPath destination path of merged log file
+ * @return true
+ * @throws IOException on any failure
+ */
+ public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
+ Path logsPath,
+ Path destPath) throws IOException {
+
+ // Listing file in given path
+ RemoteIterator<LocatedFileStatus> listOfLogFiles =
+ fileSystem.listFiles(logsPath, true);
+
+ Path destFile = new Path(destPath, "AuditLogFile");
+
+ try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) {
+
+ // Iterating over the list of files to merge and parse
+ while (listOfLogFiles.hasNext()) {
+ FileStatus fileStatus = listOfLogFiles.next();
+ int fileLength = (int) fileStatus.getLen();
+ byte[] byteBuffer = new byte[fileLength];
+ try (FSDataInputStream fsDataInputStream =
+ fileSystem.open(fileStatus.getPath())) {
+
+ // Instantiating generated AvroDataRecord class
+ AvroDataRecord avroDataRecord = new AvroDataRecord();
+
+ // Instantiate DatumWriter class
+ DatumWriter<AvroDataRecord> datumWriter =
+ new SpecificDatumWriter<AvroDataRecord>(AvroDataRecord.class);
+ DataFileWriter<AvroDataRecord> dataFileWriter =
+ new DataFileWriter<AvroDataRecord>(datumWriter);
+
+ List<String> longValues =
+ Arrays.asList("turnaroundtime", "bytessent", "objectsize",
+ "totaltime");
+
+ // Write avro data into a file in bucket destination path
+ Path avroFile = new Path(destPath, "AvroData.avro");
+
+ // Reading the file data using LineRecordReader
+ LineRecordReader lineRecordReader =
+ new LineRecordReader(fsDataInputStream, 0L, fileLength,
+ MAX_LINE_LENGTH);
+ LongWritable k = new LongWritable();
+ Text singleAuditLog = new Text();
+
+ try (FSDataOutputStream fsDataOutputStreamAvro = fileSystem.create(
+ avroFile)) {
+ // adding schema, output stream to DataFileWriter
+ dataFileWriter.create(AvroDataRecord.getClassSchema(),
+ fsDataOutputStreamAvro);
+
+ // Parse each and every audit log from list of logs
+ while (lineRecordReader.next(k, singleAuditLog)) {
+ // Parse audit log except referrer header
+ HashMap<String, String> auditLogMap =
+ parseAuditLog(singleAuditLog.toString());
+
+ String referrerHeader = auditLogMap.get("referrer");
+ if (referrerHeader == null || referrerHeader.equals("-")) {
+ LOG.debug("Log didn't parsed : {}", referrerHeader);
Review Comment:
nit "parse"
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/avro/AvroDataSchema.avsc:
##########
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+ "type" : "record", "name" : "AvroDataRecord",
Review Comment:
could you have a different name, eg. AvroS3LogEntryRecord
there's too many things called AvroDataRecord
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroDataRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+ public static final int MAX_LINE_LENGTH = 32000;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+ /**
+ * parseAuditLog method helps in parsing the audit log
+ * into key-value pairs using regular expressions.
+ *
+ * @param singleAuditLog this is single audit log from merged audit log file
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
of a single audit log
+ */
+ public HashMap<String, String> parseAuditLog(String singleAuditLog) {
+ HashMap<String, String> auditLogMap = new HashMap<>();
+ if (singleAuditLog == null || singleAuditLog.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return auditLogMap;
+ }
+ final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
+ boolean patternMatching = matcher.matches();
+ if (patternMatching) {
+ for (String key : AWS_LOG_REGEXP_GROUPS) {
+ try {
+ final String value = matcher.group(key);
+ auditLogMap.put(key, value);
+ } catch (IllegalStateException e) {
+ LOG.debug(String.valueOf(e));
+ }
+ }
+ }
+ return auditLogMap;
+ }
+
+ /**
+ * parseReferrerHeader method helps in parsing the http referrer header.
+ * which is one of the key-value pair of audit log
+ *
+ * @param referrerHeader this is the http referrer header of a particular
audit log
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
+ * of audit log as well as referrer header present in it
+ */
+ public HashMap<String, String> parseReferrerHeader(String referrerHeader) {
+ HashMap<String, String> referrerHeaderMap = new HashMap<>();
+ if (referrerHeader == null || referrerHeader.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return referrerHeaderMap;
+ }
+ int indexOfQuestionMark = referrerHeader.indexOf("?");
+ String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1,
+ referrerHeader.length() - 1);
+ int lengthOfReferrer = httpReferrer.length();
+ int start = 0;
+ while (start < lengthOfReferrer) {
+ int equals = httpReferrer.indexOf("=", start);
+ // no match : break
+ if (equals == -1) {
+ break;
+ }
+ String key = httpReferrer.substring(start, equals);
+ int end = httpReferrer.indexOf("&", equals);
+ // or end of string
+ if (end == -1) {
+ end = lengthOfReferrer;
+ }
+ String value = httpReferrer.substring(equals + 1, end);
+ referrerHeaderMap.put(key, value);
+ start = end + 1;
+ }
+ return referrerHeaderMap;
+ }
+
+ /**
+ * Merge and parse all the audit log files and convert data into avro file.
+ *
+ * @param fileSystem filesystem
+ * @param logsPath source path of logs
+ * @param destPath destination path of merged log file
+ * @return true
+ * @throws IOException on any failure
+ */
+ public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
+ Path logsPath,
+ Path destPath) throws IOException {
+
+ // Listing file in given path
+ RemoteIterator<LocatedFileStatus> listOfLogFiles =
+ fileSystem.listFiles(logsPath, true);
+
+ Path destFile = new Path(destPath, "AuditLogFile");
+
+ try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) {
+
+ // Iterating over the list of files to merge and parse
+ while (listOfLogFiles.hasNext()) {
+ FileStatus fileStatus = listOfLogFiles.next();
+ int fileLength = (int) fileStatus.getLen();
+ byte[] byteBuffer = new byte[fileLength];
+ try (FSDataInputStream fsDataInputStream =
+ fileSystem.open(fileStatus.getPath())) {
+
+ // Instantiating generated AvroDataRecord class
+ AvroDataRecord avroDataRecord = new AvroDataRecord();
+
+ // Instantiate DatumWriter class
+ DatumWriter<AvroDataRecord> datumWriter =
+ new SpecificDatumWriter<AvroDataRecord>(AvroDataRecord.class);
+ DataFileWriter<AvroDataRecord> dataFileWriter =
+ new DataFileWriter<AvroDataRecord>(datumWriter);
+
+ List<String> longValues =
+ Arrays.asList("turnaroundtime", "bytessent", "objectsize",
Review Comment:
use the constants in S3LogParser
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroDataRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+ public static final int MAX_LINE_LENGTH = 32000;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+ /**
+ * parseAuditLog method helps in parsing the audit log
+ * into key-value pairs using regular expressions.
+ *
+ * @param singleAuditLog this is single audit log from merged audit log file
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
of a single audit log
+ */
+ public HashMap<String, String> parseAuditLog(String singleAuditLog) {
+ HashMap<String, String> auditLogMap = new HashMap<>();
+ if (singleAuditLog == null || singleAuditLog.length() == 0) {
+ LOG.debug(
Review Comment:
i think we should log at info here. If we find it happens enough to fill the
logs, people can turn it off
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroDataRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+ public static final int MAX_LINE_LENGTH = 32000;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+ /**
+ * parseAuditLog method helps in parsing the audit log
+ * into key-value pairs using regular expressions.
+ *
+ * @param singleAuditLog this is single audit log from merged audit log file
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
of a single audit log
+ */
+ public HashMap<String, String> parseAuditLog(String singleAuditLog) {
+ HashMap<String, String> auditLogMap = new HashMap<>();
+ if (singleAuditLog == null || singleAuditLog.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return auditLogMap;
+ }
+ final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
+ boolean patternMatching = matcher.matches();
+ if (patternMatching) {
+ for (String key : AWS_LOG_REGEXP_GROUPS) {
+ try {
+ final String value = matcher.group(key);
+ auditLogMap.put(key, value);
+ } catch (IllegalStateException e) {
+ LOG.debug(String.valueOf(e));
+ }
+ }
+ }
+ return auditLogMap;
+ }
+
+ /**
+ * parseReferrerHeader method helps in parsing the http referrer header.
+ * which is one of the key-value pair of audit log
+ *
+ * @param referrerHeader this is the http referrer header of a particular
audit log
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
+ * of audit log as well as referrer header present in it
+ */
+ public HashMap<String, String> parseReferrerHeader(String referrerHeader) {
+ HashMap<String, String> referrerHeaderMap = new HashMap<>();
+ if (referrerHeader == null || referrerHeader.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return referrerHeaderMap;
+ }
+ int indexOfQuestionMark = referrerHeader.indexOf("?");
+ String httpReferrer = referrerHeader.substring(indexOfQuestionMark + 1,
+ referrerHeader.length() - 1);
+ int lengthOfReferrer = httpReferrer.length();
+ int start = 0;
+ while (start < lengthOfReferrer) {
+ int equals = httpReferrer.indexOf("=", start);
+ // no match : break
+ if (equals == -1) {
+ break;
+ }
+ String key = httpReferrer.substring(start, equals);
+ int end = httpReferrer.indexOf("&", equals);
+ // or end of string
+ if (end == -1) {
+ end = lengthOfReferrer;
+ }
+ String value = httpReferrer.substring(equals + 1, end);
+ referrerHeaderMap.put(key, value);
+ start = end + 1;
+ }
+ return referrerHeaderMap;
+ }
+
+ /**
+ * Merge and parse all the audit log files and convert data into avro file.
+ *
+ * @param fileSystem filesystem
+ * @param logsPath source path of logs
+ * @param destPath destination path of merged log file
+ * @return true
+ * @throws IOException on any failure
+ */
+ public boolean mergeAndParseAuditLogFiles(FileSystem fileSystem,
+ Path logsPath,
+ Path destPath) throws IOException {
+
+ // Listing file in given path
+ RemoteIterator<LocatedFileStatus> listOfLogFiles =
+ fileSystem.listFiles(logsPath, true);
+
+ Path destFile = new Path(destPath, "AuditLogFile");
+
+ try (FSDataOutputStream fsDataOutputStream = fileSystem.create(destFile)) {
+
+ // Iterating over the list of files to merge and parse
+ while (listOfLogFiles.hasNext()) {
+ FileStatus fileStatus = listOfLogFiles.next();
+ int fileLength = (int) fileStatus.getLen();
+ byte[] byteBuffer = new byte[fileLength];
+ try (FSDataInputStream fsDataInputStream =
+ fileSystem.open(fileStatus.getPath())) {
Review Comment:
can you use the new openFile builder; there are examples such as
org.apache.hadoop.fs.AvroFSInput#AvroFSInput(org.apache.hadoop.fs.FileContext,
org.apache.hadoop.fs.Path) to show the basic strategy.
if you an add the filestatus in .withFileStatus() we can save a HEAD request
opening files on s3 and abfs, for faster reads
##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/audit/mapreduce/S3AAuditLogMergerAndParser.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit.mapreduce;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.specific.SpecificDatumWriter;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.audit.AvroDataRecord;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.AWS_LOG_REGEXP_GROUPS;
+import static org.apache.hadoop.fs.s3a.audit.S3LogParser.LOG_ENTRY_PATTERN;
+
+/**
+ * Merge all the audit logs present in a directory of
+ * multiple audit log files into a single audit log file.
+ */
+public class S3AAuditLogMergerAndParser {
+
+ public static final int MAX_LINE_LENGTH = 32000;
+ private static final Logger LOG =
+ LoggerFactory.getLogger(S3AAuditLogMergerAndParser.class);
+
+ /**
+ * parseAuditLog method helps in parsing the audit log
+ * into key-value pairs using regular expressions.
+ *
+ * @param singleAuditLog this is single audit log from merged audit log file
+ * @return it returns a map i.e, auditLogMap which contains key-value pairs
of a single audit log
+ */
+ public HashMap<String, String> parseAuditLog(String singleAuditLog) {
+ HashMap<String, String> auditLogMap = new HashMap<>();
+ if (singleAuditLog == null || singleAuditLog.length() == 0) {
+ LOG.debug(
+ "This is an empty string or null string, expected a valid string to
parse");
+ return auditLogMap;
+ }
+ final Matcher matcher = LOG_ENTRY_PATTERN.matcher(singleAuditLog);
+ boolean patternMatching = matcher.matches();
+ if (patternMatching) {
+ for (String key : AWS_LOG_REGEXP_GROUPS) {
+ try {
+ final String value = matcher.group(key);
+ auditLogMap.put(key, value);
+ } catch (IllegalStateException e) {
+ LOG.debug(String.valueOf(e));
Review Comment:
does this happen?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/audit/TestAuditTool.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.audit;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.nio.file.Files;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.s3a.AbstractS3ATestBase;
+
+import static
org.apache.hadoop.fs.s3a.audit.TestS3AAuditLogMergerAndParser.SAMPLE_LOG_ENTRY;
+
+/**
+ * This will implement tests on AuditTool class.
+ */
+public class TestAuditTool extends AbstractS3ATestBase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestAuditTool.class);
+
+ private final AuditTool auditTool = new AuditTool();
+
+ /**
+ * Sample directories and files to test.
+ */
+ private File sampleFile;
+ private File sampleDir;
+ private File sampleDestDir;
+
+ /**
+ * Testing run method in AuditTool class by passing source and destination
+ * paths.
+ */
+ @Test
+ public void testRun() throws Exception {
+ sampleDir = Files.createTempDirectory("sampleDir").toFile();
+ sampleFile = File.createTempFile("sampleFile", ".txt", sampleDir);
+ try (FileWriter fw = new FileWriter(sampleFile)) {
+ fw.write(SAMPLE_LOG_ENTRY);
+ fw.flush();
+ }
+ sampleDestDir = Files.createTempDirectory("sampleDestDir").toFile();
+ Path logsPath = new Path(sampleDir.toURI());
+ Path destPath = new Path(sampleDestDir.toURI());
+ String[] args = {destPath.toString(), logsPath.toString()};
+ auditTool.run(args);
+ FileSystem fileSystem = destPath.getFileSystem(getConfiguration());
+ RemoteIterator<LocatedFileStatus> listOfDestFiles =
Review Comment:
better to just create the expected path, e.g
```
new Path(destPath, "AvroData.avro");
```
and open it. if the path isn't there, an exception gets raised. and it saves
the work of listing and scanning
> Merging of S3A Audit Logs
> -------------------------
>
> Key: HADOOP-18258
> URL: https://issues.apache.org/jira/browse/HADOOP-18258
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3
> Reporter: Sravani Gadey
> Assignee: Sravani Gadey
> Priority: Major
> Labels: pull-request-available
> Time Spent: 12.5h
> Remaining Estimate: 0h
>
> Merging audit log files containing huge number of audit logs collected from a
> job like Hive or Spark job containing various S3 requests like list, head,
> get and put requests.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]