This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 5d1dad7 DRILL-7532: Convert Syslog to EVF
5d1dad7 is described below
commit 5d1dad7828fe41a271e08cdb19cde84a1d78b268
Author: Charles Givre <[email protected]>
AuthorDate: Thu Dec 17 10:56:01 2020 -0500
DRILL-7532: Convert Syslog to EVF
---
contrib/format-syslog/pom.xml | 1 -
.../drill/exec/store/syslog/SyslogBatchReader.java | 335 ++++++++++++++++++
.../exec/store/syslog/SyslogFormatConfig.java | 5 +-
.../exec/store/syslog/SyslogFormatPlugin.java | 101 +++---
.../exec/store/syslog/SyslogRecordReader.java | 379 ---------------------
.../main/resources/bootstrap-format-plugins.json | 6 +-
.../drill/exec/store/syslog/TestSyslogFormat.java | 360 +++++++++++--------
.../src/test/resources/syslog/test.syslog1 | 2 +-
8 files changed, 615 insertions(+), 574 deletions(-)
diff --git a/contrib/format-syslog/pom.xml b/contrib/format-syslog/pom.xml
index 99052f3..54e67bd 100644
--- a/contrib/format-syslog/pom.xml
+++ b/contrib/format-syslog/pom.xml
@@ -50,7 +50,6 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.apache.drill</groupId>
<artifactId>drill-common</artifactId>
diff --git
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
new file mode 100644
index 0000000..6dd3e55
--- /dev/null
+++
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.syslog;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.realityforge.jsyslog.message.StructuredDataParameter;
+import org.realityforge.jsyslog.message.SyslogMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SyslogBatchReader implements ManagedReader<FileSchemaNegotiator> {
+ private static final Logger logger =
LoggerFactory.getLogger(SyslogBatchReader.class);
+ private final String STRUCTURED_DATA_PREFIX = "structured_data_";
+ private final String STRUCTURED_DATA_MAP_NAME = "structured_data";
+ private final String RAW_COLUMN_NAME = "_raw";
+
+ private final int maxRecords;
+ private final SyslogFormatConfig config;
+ private final EasySubScan subScan;
+ private final Map<String, MinorType> mappedColumns = new LinkedHashMap<>();
+ private int lineCount;
+ private int errorCount;
+ private CustomErrorContext errorContext;
+ private InputStream fsStream;
+ private FileSplit split;
+ private BufferedReader reader;
+ private RowSetLoader rowWriter;
+ private List<ScalarWriter> writerArray;
+ private ScalarWriter rawColumnWriter;
+ private ScalarWriter messageWriter;
+ private TupleWriter structuredDataWriter;
+
+
+ public SyslogBatchReader(int maxRecords, SyslogFormatConfig config,
EasySubScan scan) {
+ this.maxRecords = maxRecords;
+ this.config = config;
+ this.subScan = scan;
+ populateMappedColumns();
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ split = negotiator.split();
+ openFile(negotiator);
+ negotiator.tableSchema(buildSchema(), false);
+ errorContext = negotiator.parentErrorContext();
+
+ ResultSetLoader loader = negotiator.build();
+ rowWriter = loader.writer();
+ writerArray = populateRowWriters();
+ rawColumnWriter = rowWriter.scalar(RAW_COLUMN_NAME);
+ messageWriter = rowWriter.scalar("message");
+ return true;
+ }
+
+ @Override
+ public boolean next() {
+ while (!rowWriter.isFull()) {
+ if (!processNextLine()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ if (fsStream != null) {
+ AutoCloseables.closeSilently(fsStream);
+ fsStream = null;
+ }
+
+ if (reader != null) {
+ AutoCloseables.closeSilently(reader);
+ reader = null;
+ }
+ }
+
+ private void openFile(FileSchemaNegotiator negotiator) {
+ try {
+ fsStream =
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Unable to open Syslog File %s", split.getPath())
+ .addContext(e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ this.lineCount = 0;
+ reader = new BufferedReader(new InputStreamReader(fsStream));
+ }
+
+ public TupleMetadata buildSchema() {
+ SchemaBuilder builder = new SchemaBuilder();
+ for (Map.Entry<String, MinorType> entry : mappedColumns.entrySet()) {
+ builder.addNullable(entry.getKey(), entry.getValue());
+ }
+ if (! config.flattenStructuredData()) {
+ ColumnMetadata structuredDataMap =
MetadataUtils.newMap(STRUCTURED_DATA_MAP_NAME);
+ builder.add(structuredDataMap);
+ }
+
+ builder.addNullable("message", MinorType.VARCHAR);
+
+ // Add _raw column
+ ColumnMetadata colSchema = MetadataUtils.newScalar(RAW_COLUMN_NAME,
MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+ colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+ builder.add(colSchema);
+ return builder.buildSchema();
+ }
+
+ private List<ScalarWriter> populateRowWriters() {
+ List<ScalarWriter> writerArray = new ArrayList<>();
+ for (Map.Entry<String, MinorType> entry : mappedColumns.entrySet()) {
+ writerArray.add(rowWriter.scalar(entry.getKey()));
+ }
+
+ if (! config.flattenStructuredData()) {
+ structuredDataWriter = rowWriter.tuple(STRUCTURED_DATA_MAP_NAME);
+ }
+
+ return writerArray;
+ }
+
+ private void populateMappedColumns() {
+ mappedColumns.put("event_date", MinorType.TIMESTAMP);
+ mappedColumns.put("severity_code", MinorType.INT);
+ mappedColumns.put("facility_code", MinorType.INT);
+ mappedColumns.put("severity", MinorType.VARCHAR);
+ mappedColumns.put("facility", MinorType.VARCHAR);
+ mappedColumns.put("ip", MinorType.VARCHAR);
+ mappedColumns.put("app_name", MinorType.VARCHAR);
+ mappedColumns.put("process_id", MinorType.VARCHAR);
+ mappedColumns.put("message_id", MinorType.VARCHAR);
+ mappedColumns.put("structured_data_text", MinorType.VARCHAR);
+ }
+
+ private boolean processNextLine() {
+ // Check to see if the limit has been reached
+ if (rowWriter.limitReached(maxRecords)) {
+ return false;
+ }
+
+ String line;
+ try {
+ line = reader.readLine();
+
+ // If the line is empty, return false
+ if (line == null) {
+ return false;
+ }
+
+ // Remove leading and trailing whitespace
+ line = line.trim();
+ if (line.length() == 0) {
+ // Skip empty lines
+ return true;
+ }
+
+ SyslogMessage parsedMessage =
SyslogMessage.parseStructuredSyslogMessage(line);
+ rowWriter.start();
+ writeStructuredColumns(parsedMessage);
+ writeStructuredData(parsedMessage);
+
+ if (isProjected(rawColumnWriter)) {
+ rawColumnWriter.setString(line);
+ }
+
+ if (isProjected(messageWriter)) {
+ logger.debug("Message: {}", parsedMessage.getMessage());
+ messageWriter.setString(parsedMessage.getMessage());
+ }
+
+ } catch (IOException e) {
+ errorCount++;
+ if (errorCount > config.getMaxErrors()) {
+ throw UserException
+ .dataReadError()
+ .message("Maximum Error Threshold Exceeded. Error reading Syslog
file at line %d", lineCount)
+ .addContext(e.getMessage())
+ .build(logger);
+ }
+ }
+ lineCount++;
+ rowWriter.save();
+ return true;
+ }
+
+ private void writeStructuredColumns(SyslogMessage parsedMessage) {
+ long milliseconds = parsedMessage.getTimestamp().getMillis();
+ writerArray.get(0).setTimestamp(new Instant(milliseconds));
+ writerArray.get(1).setInt(parsedMessage.getLevel().ordinal());
+ writerArray.get(2).setInt(parsedMessage.getFacility().ordinal());
+ setString(writerArray.get(3), parsedMessage.getLevel().name());
+ setString(writerArray.get(4), parsedMessage.getFacility().name());
+ setString(writerArray.get(5), parsedMessage.getHostname());
+ setString(writerArray.get(6), parsedMessage.getAppName());
+ setString(writerArray.get(7), parsedMessage.getProcId());
+ setString(writerArray.get(8), parsedMessage.getMsgId());
+
+ Map<String, List<StructuredDataParameter>> structuredData =
parsedMessage.getStructuredData();
+
+ if (structuredData != null) {
+
writerArray.get(9).setString(parsedMessage.getStructuredData().toString());
+ }
+ logger.debug("Successfully mapped known fields");
+ }
+
+ /**
+ * Write the flattened structured data fields to Drill vectors. The data in
the structured fields is not known in
+ * advance and also is not consistent between syslog entries, so we have to
add these fields on the fly. The only possible
+ * data type in these cases are VARCHARs.
+ * @param parsedMessage The parsed syslog message
+ */
+ private void writeStructuredData(SyslogMessage parsedMessage) {
+ Map<String, List<StructuredDataParameter>> structuredData =
parsedMessage.getStructuredData();
+ // Prevent NPE if there is no structured data text
+ if (structuredData == null) {
+ return;
+ }
+
+ if (config.flattenStructuredData()) {
+ // Iterate over the structured data fields and map to Drill vectors
+ for (Map.Entry<String, List<StructuredDataParameter>> entry :
structuredData.entrySet()) {
+ for (StructuredDataParameter parameter : entry.getValue()) {
+ // These fields are not known in advance and are not necessarily
consistent
+ String fieldName = STRUCTURED_DATA_PREFIX + parameter.getName();
+ String fieldValue = parameter.getValue();
+ writeStringColumn(rowWriter, fieldName, fieldValue);
+ logger.debug("Writing {} {}", fieldName, fieldValue);
+ }
+ }
+ } else {
+ writeStructuredDataToMap(structuredData);
+ }
+ }
+
+ private void writeStructuredDataToMap(Map<String,
List<StructuredDataParameter>> structuredData) {
+ // Iterate over the structured data fields and write to a Drill map
+ for (Map.Entry<String, List<StructuredDataParameter>> entry :
structuredData.entrySet()) {
+ for (StructuredDataParameter parameter : entry.getValue()) {
+ // These fields are not known in advance and are not necessarily
consistent
+ String fieldName = parameter.getName();
+ String fieldValue = parameter.getValue();
+ writeStringColumn(structuredDataWriter, fieldName, fieldValue);
+ }
+ }
+ }
+
+ /**
+ * Writes data to a String column. If there is no ScalarWriter for the
particular column, this function will create one.
+ * @param rowWriter The ScalarWriter to which we are writing
+ * @param name The field name to be written
+ * @param value The field value to be written
+ */
+ private void writeStringColumn(TupleWriter rowWriter, String name, String
value) {
+ ScalarWriter colWriter = getColWriter(rowWriter, name,
TypeProtos.MinorType.VARCHAR);
+ colWriter.setString(value);
+ }
+
+ private ScalarWriter getColWriter(TupleWriter tupleWriter, String fieldName,
TypeProtos.MinorType type) {
+ int index = tupleWriter.tupleSchema().index(fieldName);
+ if (index == -1) {
+ ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, type,
TypeProtos.DataMode.OPTIONAL);
+ index = tupleWriter.addColumn(colSchema);
+ }
+ return tupleWriter.scalar(index);
+ }
+
+ /**
+ * The ScalarWriter objects have method to verify whether the writer is
projected or not, however it does not
+ * seem to take the star queries into account. This method checks to see if
the query is a star query and includes that
+ * in the determination of whether the column is projected or not.
+ * @param writer A scalarWriter
+ * @return True if the column is projected, false if not.
+ */
+ private boolean isProjected(ScalarWriter writer) {
+ // Case for star query
+ if (subScan.getColumns().size() == 1 &&
subScan.getColumns().get(0).isDynamicStar()) {
+ return true;
+ } else {
+ return writer.isProjected();
+ }
+ }
+
+ private void setString(ScalarWriter writer, String value) {
+ if (value == null) {
+ return;
+ }
+ writer.setString(value);
+ }
+}
diff --git
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java
index e851e31..8f2fc3f 100644
---
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java
+++
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java
@@ -49,14 +49,17 @@ public class SyslogFormatConfig implements
FormatPluginConfig {
this.flattenStructuredData = flattenStructuredData == null ? false :
flattenStructuredData;
}
- public boolean getFlattenStructuredData() {
+ @JsonProperty("flattenStructuredData")
+ public boolean flattenStructuredData() {
return flattenStructuredData;
}
+ @JsonProperty("maxErrors")
public int getMaxErrors() {
return maxErrors;
}
+ @JsonProperty("extensions")
public List<String> getExtensions() {
return extensions;
}
diff --git
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
index d21035b..88b53d1 100644
---
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
+++
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
@@ -18,82 +18,77 @@
package org.apache.drill.exec.store.syslog;
-import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.util.List;
public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> {
public static final String DEFAULT_NAME = "syslog";
- private final SyslogFormatConfig formatConfig;
- public SyslogFormatPlugin(String name, DrillbitContext context,
- Configuration fsConf, StoragePluginConfig
storageConfig,
- SyslogFormatConfig formatConfig) {
- super(name, context, fsConf, storageConfig, formatConfig,
- true, // readable
- false, // writable
- true, // blockSplittable
- true, // compressible
- Lists.newArrayList(formatConfig.getExtensions()),
- DEFAULT_NAME);
- this.formatConfig = formatConfig;
- }
+ private static class SyslogReaderFactory extends FileReaderFactory {
- @Override
- public RecordReader getRecordReader(FragmentContext context, DrillFileSystem
dfs, FileWork fileWork,
- List<SchemaPath> columns, String
userName) {
- return new SyslogRecordReader(context, dfs, fileWork, columns, userName,
formatConfig);
- }
+ private final int maxRecords;
+ private final SyslogFormatConfig formatConfig;
+ private final EasySubScan scan;
- @Override
- public boolean supportsPushDown() {
- return true;
- }
+ public SyslogReaderFactory(int maxRecords, SyslogFormatConfig
formatConfig, EasySubScan scan) {
+ this.maxRecords = maxRecords;
+ this.formatConfig = formatConfig;
+ this.scan = scan;
+ }
- @Override
- public RecordWriter getRecordWriter(FragmentContext context,
- EasyWriter writer) throws
UnsupportedOperationException {
- throw new UnsupportedOperationException("Drill does not support writing
records to Syslog format.");
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+ return new SyslogBatchReader(maxRecords, formatConfig, scan);
+ }
}
- @Override
- public int getReaderOperatorType() {
- return CoreOperatorType.SYSLOG_SUB_SCAN_VALUE;
+ public SyslogFormatPlugin(String name, DrillbitContext context,
+ Configuration fsConf, StoragePluginConfig
storageConfig,
+ SyslogFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig,
formatConfig);
}
- @Override
- public int getWriterOperatorType() {
- throw new UnsupportedOperationException("Drill does not support writing
records to Syslog format.");
+ private static EasyFormatConfig easyConfig(Configuration fsConf,
SyslogFormatConfig pluginConfig) {
+ EasyFormatConfig config = new EasyFormatConfig();
+ config.readable = true;
+ config.writable = false;
+ config.blockSplittable = false;
+ config.compressible = true;
+ config.supportsProjectPushdown = true;
+ config.extensions = pluginConfig.getExtensions();
+ config.fsConf = fsConf;
+ config.defaultName = DEFAULT_NAME;
+ config.readerOperatorType = CoreOperatorType.SYSLOG_SUB_SCAN_VALUE;
+ config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
+ return config;
}
@Override
- public boolean supportsStatistics() {
- return false;
+ public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
+ EasySubScan scan, OptionManager options) {
+ return new SyslogBatchReader(scan.getMaxRecords(), formatConfig, scan);
}
@Override
- public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
- throw new UnsupportedOperationException("unimplemented");
- }
+ protected FileScanBuilder frameworkBuilder(OptionManager options,
EasySubScan scan) {
+ FileScanBuilder builder = new FileScanBuilder();
+ builder.setReaderFactory(new SyslogReaderFactory(scan.getMaxRecords(),
formatConfig, scan));
- @Override
- public void writeStatistics(TableStatistics statistics, FileSystem fs, Path
statsTablePath) {
- throw new UnsupportedOperationException("unimplemented");
+ initScanBuilder(builder, scan);
+ builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+ return builder;
}
}
diff --git
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
deleted file mode 100644
index 0f39887..0000000
---
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.syslog;
-
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.realityforge.jsyslog.message.StructuredDataParameter;
-import org.realityforge.jsyslog.message.SyslogMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-
-public class SyslogRecordReader extends AbstractRecordReader {
-
- private static final Logger logger =
LoggerFactory.getLogger(SyslogRecordReader.class);
- private static final int MAX_RECORDS_PER_BATCH = 4096;
-
- private final DrillFileSystem fileSystem;
- private final FileWork fileWork;
- private final String userName;
- private BufferedReader reader;
- private DrillBuf buffer;
- private VectorContainerWriter writer;
- private final int maxErrors;
- private final boolean flattenStructuredData;
- private int errorCount;
- private int lineCount;
- private final List<SchemaPath> projectedColumns;
- private String line;
-
- public SyslogRecordReader(FragmentContext context,
- DrillFileSystem fileSystem,
- FileWork fileWork,
- List<SchemaPath> columns,
- String userName,
- SyslogFormatConfig config) throws
OutOfMemoryException {
-
- this.fileSystem = fileSystem;
- this.fileWork = fileWork;
- this.userName = userName;
- this.maxErrors = config.getMaxErrors();
- this.errorCount = 0;
- this.buffer = context.getManagedBuffer().reallocIfNeeded(4096);
- this.projectedColumns = columns;
- this.flattenStructuredData = config.getFlattenStructuredData();
-
- setColumns(columns);
- }
-
- @Override
- public void setup(final OperatorContext context, final OutputMutator output)
{
- openFile();
- this.writer = new VectorContainerWriter(output);
- }
-
- private void openFile() {
- InputStream in;
- try {
- in = fileSystem.openPossiblyCompressedStream(fileWork.getPath());
- } catch (Exception e) {
- throw UserException
- .dataReadError(e)
- .message("Failed to open open input file: %s",
fileWork.getPath())
- .addContext("User name", this.userName)
- .build(logger);
- }
- this.lineCount = 0;
- reader = new BufferedReader(new InputStreamReader(in));
- }
-
- @Override
- public int next() {
- this.writer.allocate();
- this.writer.reset();
-
- int recordCount = 0;
-
- try {
- BaseWriter.MapWriter map = this.writer.rootAsMap();
- String line;
-
- while (recordCount < MAX_RECORDS_PER_BATCH && (line =
this.reader.readLine()) != null) {
- lineCount++;
-
- // Skip empty lines
- line = line.trim();
- if (line.length() == 0) {
- continue;
- }
- this.line = line;
-
- try {
- SyslogMessage parsedMessage =
SyslogMessage.parseStructuredSyslogMessage(line);
-
- this.writer.setPosition(recordCount);
- map.start();
-
- if (isStarQuery()) {
- writeAllColumns(map, parsedMessage);
- } else {
- writeProjectedColumns(map, parsedMessage);
- }
- map.end();
- recordCount++;
-
- } catch (Exception e) {
- errorCount++;
- if (errorCount > maxErrors) {
- throw UserException
- .dataReadError()
- .message("Maximum Error Threshold Exceeded: ")
- .addContext("Line: " + lineCount)
- .addContext(e.getMessage())
- .build(logger);
- }
- }
- }
-
- this.writer.setValueCount(recordCount);
- return recordCount;
-
- } catch (final Exception e) {
- errorCount++;
- if (errorCount > maxErrors) {
- throw UserException.dataReadError()
- .message("Error parsing file")
- .addContext(e.getMessage())
- .build(logger);
- }
- }
-
- return recordCount;
- }
-
- private void writeAllColumns(BaseWriter.MapWriter map, SyslogMessage
parsedMessage) {
-
- long milliseconds = 0;
- try {
- milliseconds = parsedMessage.getTimestamp().getMillis();
- } catch (final Exception e) {
- errorCount++;
- if (errorCount > maxErrors) {
- throw UserException.dataReadError()
- .message("Syslog Format Plugin: Error parsing date")
- .addContext(e.getMessage())
- .build(logger);
- }
- }
- map.timeStamp("event_date").writeTimeStamp(milliseconds);
- map.integer("severity_code").writeInt(parsedMessage.getLevel().ordinal());
-
map.integer("facility_code").writeInt(parsedMessage.getFacility().ordinal());
-
- mapStringField("severity", parsedMessage.getLevel().name(), map);
- mapStringField("facility", parsedMessage.getFacility().name(), map);
- mapStringField("ip", parsedMessage.getHostname(), map);
- mapStringField("app_name", parsedMessage.getAppName(), map);
- mapStringField("process_id", parsedMessage.getProcId(), map);
- mapStringField("message_id", parsedMessage.getMsgId(), map);
-
- if (parsedMessage.getStructuredData() != null) {
- mapStringField("structured_data_text",
parsedMessage.getStructuredData().toString(), map);
- Map<String, List<StructuredDataParameter>> structuredData =
parsedMessage.getStructuredData();
- if (flattenStructuredData) {
- mapFlattenedStructuredData(structuredData, map);
- } else {
- mapComplexField("structured_data", structuredData, map);
- }
- }
- mapStringField("message", parsedMessage.getMessage(), map);
- }
-
- private void writeProjectedColumns(BaseWriter.MapWriter map, SyslogMessage
parsedMessage) throws UserException {
- String columnName;
-
- for (SchemaPath col : projectedColumns) {
-
- //Case for nested fields
- if (col.getAsNamePart().hasChild()) {
- String fieldName = col.getAsNamePart().getChild().getName();
- mapStructuredDataField(fieldName, map, parsedMessage);
- } else {
- columnName = col.getAsNamePart().getName();
-
- //Extracts fields from structured data IF the user selected to flatten
these fields
- if ((!columnName.equals("structured_data_text")) &&
columnName.startsWith("structured_data_")) {
- String fieldName = columnName.replace("structured_data_", "");
- String value = getFieldFromStructuredData(fieldName, parsedMessage);
- mapStringField(columnName, value, map);
- } else {
- switch (columnName) {
- case "event_date":
- long milliseconds = parsedMessage.getTimestamp().getMillis();
//TODO put in try/catch
- map.timeStamp("event_date").writeTimeStamp(milliseconds);
- break;
- case "severity_code":
-
map.integer("severity_code").writeInt(parsedMessage.getLevel().ordinal());
- break;
- case "facility_code":
-
map.integer("facility_code").writeInt(parsedMessage.getFacility().ordinal());
- break;
- case "severity":
- mapStringField("severity", parsedMessage.getLevel().name(), map);
- break;
- case "facility":
- mapStringField("facility", parsedMessage.getFacility().name(),
map);
- break;
- case "ip":
- mapStringField("ip", parsedMessage.getHostname(), map);
- break;
- case "app_name":
- mapStringField("app_name", parsedMessage.getAppName(), map);
- break;
- case "process_id":
- mapStringField("process_id", parsedMessage.getProcId(), map);
- break;
- case "msg_id":
- mapStringField("message_id", parsedMessage.getMsgId(), map);
- break;
- case "structured_data":
- if (parsedMessage.getStructuredData() != null) {
- Map<String, List<StructuredDataParameter>> structured_data =
parsedMessage.getStructuredData();
- mapComplexField("structured_data", structured_data, map);
- }
- break;
- case "structured_data_text":
- if (parsedMessage.getStructuredData() != null) {
- mapStringField("structured_data_text",
parsedMessage.getStructuredData().toString(), map);
- } else {
- mapStringField("structured_data_text", "", map);
- }
- break;
- case "message":
- mapStringField("message", parsedMessage.getMessage(), map);
- break;
- case "_raw":
- mapStringField("_raw", this.line, map);
- break;
-
- default:
- mapStringField(columnName, "", map);
- }
- }
- }
- }
- }
-
- //Helper function to map strings
- private void mapStringField(String name, String value, BaseWriter.MapWriter
map) {
- if (value == null) {
- return;
- }
- try {
- byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
- int stringLength = bytes.length;
- this.buffer = buffer.reallocIfNeeded(stringLength);
- this.buffer.setBytes(0, bytes, 0, stringLength);
- map.varChar(name).writeVarChar(0, stringLength, buffer);
- } catch (Exception e) {
- throw UserException
- .dataWriteError()
- .addContext("Could not write string: ")
- .addContext(e.getMessage())
- .build(logger);
- }
- }
-
- //Helper function to flatten structured data
- private void mapFlattenedStructuredData(Map<String,
List<StructuredDataParameter>> data, BaseWriter.MapWriter map) {
- for (Map.Entry<String, List<StructuredDataParameter>> entry :
data.entrySet()) {
- for (StructuredDataParameter parameter : entry.getValue()) {
- String fieldName = "structured_data_" + parameter.getName();
- String fieldValue = parameter.getValue();
- mapStringField(fieldName, fieldValue, map);
- }
- }
- }
-
- //Gets field from the Structured Data Construct
- private String getFieldFromStructuredData(String fieldName, SyslogMessage
parsedMessage) {
- for (Map.Entry<String, List<StructuredDataParameter>> entry :
parsedMessage.getStructuredData().entrySet()) {
- for (StructuredDataParameter d : entry.getValue()) {
- if (d.getName().equals(fieldName)) {
- return d.getValue();
- }
- }
- }
- return null;
- }
-
- //Helper function to map arrays
- private void mapComplexField(String mapName, Map<String,
List<StructuredDataParameter>> data, BaseWriter.MapWriter map) {
- for (Map.Entry<String, List<StructuredDataParameter>> entry :
data.entrySet()) {
- List<StructuredDataParameter> dataParameters = entry.getValue();
- String fieldName;
- String fieldValue;
-
- for (StructuredDataParameter parameter : dataParameters) {
- fieldName = parameter.getName();
- fieldValue = parameter.getValue();
-
- VarCharHolder rowHolder = new VarCharHolder();
-
- byte[] rowStringBytes = fieldValue.getBytes();
- this.buffer.reallocIfNeeded(rowStringBytes.length);
- this.buffer.setBytes(0, rowStringBytes);
- rowHolder.start = 0;
- rowHolder.end = rowStringBytes.length;
- rowHolder.buffer = this.buffer;
-
- map.map(mapName).varChar(fieldName).write(rowHolder);
- }
- }
- }
-
- private void mapStructuredDataField(String fieldName, BaseWriter.MapWriter
map, SyslogMessage parsedMessage) {
- String fieldValue = getFieldFromStructuredData(fieldName, parsedMessage);
- VarCharHolder rowHolder = new VarCharHolder();
-
- byte[] rowStringBytes = fieldValue.getBytes();
- this.buffer.reallocIfNeeded(rowStringBytes.length);
- this.buffer.setBytes(0, rowStringBytes);
- rowHolder.start = 0;
- rowHolder.end = rowStringBytes.length;
- rowHolder.buffer = this.buffer;
-
- map.map("structured_data").varChar(fieldName).write(rowHolder);
- }
-
- public SimpleDateFormat getValidDateObject(String d) {
- SimpleDateFormat tempDateFormat;
- if (d != null && !d.isEmpty()) {
- tempDateFormat = new SimpleDateFormat(d);
- } else {
- throw UserException
- .parseError()
- .message("Invalid date format")
- .build(logger);
- }
- return tempDateFormat;
- }
-
- public void close() throws Exception {
- this.reader.close();
- }
-}
diff --git
a/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
b/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
index ee5a396..ea55012 100644
--- a/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
+++ b/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
@@ -7,7 +7,8 @@
"type": "syslog",
"extensions": [
"syslog"
- ]
+ ],
+ "maxErrors" : 0
}
}
},
@@ -18,7 +19,8 @@
"type": "syslog",
"extensions": [
"syslog"
- ]
+ ],
+ "maxErrors" : 0
}
}
}
diff --git
a/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
b/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
index c7bd833..4fb15c5 100644
---
a/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
+++
b/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
@@ -17,17 +17,19 @@
*/
package org.apache.drill.exec.store.syslog;
+import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.logical.FormatPluginConfig;
import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.rpc.RpcException;
import org.apache.drill.test.ClusterTest;
-import org.apache.drill.test.BaseDirTestWatcher;
import org.apache.drill.exec.physical.rowSet.RowSet;
import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
import org.apache.drill.test.ClusterFixture;
@@ -35,20 +37,26 @@ import org.apache.drill.test.rowSet.RowSetComparison;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.junit.BeforeClass;
import org.junit.Test;
-import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
-public class TestSyslogFormat extends ClusterTest {
+import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+import static org.junit.Assert.assertEquals;
- @ClassRule
- public static final BaseDirTestWatcher dirTestWatcher = new
BaseDirTestWatcher();
+@Category(RowSetTests.class)
+public class TestSyslogFormat extends ClusterTest {
@BeforeClass
public static void setup() throws Exception {
-
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+ // Needed for compressed file unit test
+ dirTestWatcher.copyResourceToRoot(Paths.get("syslog/"));
+
defineSyslogPlugin();
}
- private static void defineSyslogPlugin() throws ExecutionSetupException {
+ private static void defineSyslogPlugin() {
Map<String, FormatPluginConfig> formats = new HashMap<>();
formats.put("sample", new SyslogFormatConfig(
Collections.singletonList("syslog"), null, null));
@@ -58,6 +66,7 @@ public class TestSyslogFormat extends ClusterTest {
// Define a temporary plugin for the "cp" storage plugin.
cluster.defineFormats("cp", formats);
+ cluster.defineFormats("dfs", formats);
}
@Test
@@ -76,88 +85,91 @@ public class TestSyslogFormat extends ClusterTest {
RowSet results = client.queryBuilder().sql(sql).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("event_date", TypeProtos.MinorType.TIMESTAMP,
TypeProtos.DataMode.OPTIONAL)
- .add("severity_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
- .add("severity", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("facility_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
- .add("facility", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("ip", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("process_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("message_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_text", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .buildSchema();
+ .add("event_date", TypeProtos.MinorType.TIMESTAMP,
TypeProtos.DataMode.OPTIONAL)
+ .add("severity_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("severity", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("facility_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("facility", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("process_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("message_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_text", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1065910455003L, 2, "CRIT", 4, "AUTH",
"mymachine.example.com", null, "", "")
- .addRow(482196050520L, 2, "CRIT", 4, "AUTH",
"mymachine.example.com", null, "", "")
- .addRow(482196050520L, 2, "CRIT", 4, "AUTH",
"mymachine.example.com", null, "", "")
- .addRow(1065910455003L, 2, "CRIT", 4, "AUTH",
"mymachine.example.com", null, "", "")
- .addRow(1061727255000L, 2, "CRIT", 4, "AUTH",
"mymachine.example.com", null, "", "")
- .addRow(1061727255000L, 5, "NOTICE", 20, "LOCAL4", "192.0.2.1",
"8710", "", "")
- .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4",
"mymachine.example.com", null, "", "{examplePriority@32473=[class=high],
exampleSDID@32473=[iut=3, eventSource=Application, eventID=1011]}")
- .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4",
"mymachine.example.com", null, "", "{examplePriority@32473=[class=high],
exampleSDID@32473=[iut=3, eventSource=Application, eventID=1011]}")
- .build();
+ .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", "mymachine.example.com",
null, "ID47", null)
+ .addRow(482196050520L, 2, "CRIT", 4, "AUTH", "mymachine.example.com",
null, "ID47", null)
+ .addRow(482196050520L, 2, "CRIT", 4, "AUTH", "mymachine.example.com",
null, "ID47", null)
+ .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", "mymachine.example.com",
null, "ID47", null)
+ .addRow(1061727255000L, 2, "CRIT", 4, "AUTH", "mymachine.example.com",
null, "ID47", null)
+ .addRow(1061727255000L, 5, "NOTICE", 20, "LOCAL4", "192.0.2.1", "8710",
null, null)
+ .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4",
"mymachine.example.com", null, "ID47", "{examplePriority@32473=[class=high],
exampleSDID@32473=[iut=3, " +
+ "eventSource=Application, eventID=1011]}")
+ .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4",
"mymachine.example.com", null, "ID47", "{examplePriority@32473=[class=high],
exampleSDID@32473=[iut=3, " +
+ "eventSource=Application, eventID=1011]}")
+ .build();
new RowSetComparison(expected).verifyAndClearAll(results);
}
@Test
- public void testStarQuery() throws RpcException {
+ public void testStarQuery() throws Exception {
String sql = "SELECT * FROM cp.`syslog/logs1.syslog`";
-
RowSet results = client.queryBuilder().sql(sql).rowSet();
-
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("event_date", TypeProtos.MinorType.TIMESTAMP,
TypeProtos.DataMode.OPTIONAL)
- .add("severity_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
- .add("facility_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
- .add("severity", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("facility", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("ip", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("app_name", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("message_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("message", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("process_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .buildSchema();
+ .add("event_date", TypeProtos.MinorType.TIMESTAMP,
TypeProtos.DataMode.OPTIONAL)
+ .add("severity_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("facility_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("severity", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("facility", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("app_name", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("process_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("message_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_text", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data", MinorType.MAP, DataMode.REQUIRED)
+ .add("message", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1065910455003L, 2, 4, "CRIT", "AUTH",
"mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on
/dev/pts/8", null)
- .addRow(482196050520L, 2, 4, "CRIT", "AUTH",
"mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on
/dev/pts/8", null)
- .addRow(482196050520L, 2, 4, "CRIT", "AUTH",
"mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on
/dev/pts/8", null)
- .addRow(1065910455003L, 2, 4, "CRIT", "AUTH",
"mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on
/dev/pts/8", null)
- .addRow(1061727255000L, 2, 4, "CRIT", "AUTH",
"mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on
/dev/pts/8", null)
- .addRow(1061727255000L, 5, 20, "NOTICE", "LOCAL4", "192.0.2.1",
"myproc", null, "%% It's time to make the do-nuts.", "8710")
- .build();
-
+ .addRow(1065910455003L, 2, 4, "CRIT", "AUTH", "mymachine.example.com",
"su", null, "ID47", null, mapArray(), "BOM'su root' failed for lonvick on
/dev/pts/8")
+ .addRow(482196050520L, 2, 4, "CRIT", "AUTH", "mymachine.example.com",
"su", null, "ID47", null, mapArray(),"BOM'su root' failed for lonvick on
/dev/pts/8")
+ .addRow(482196050520L, 2, 4, "CRIT", "AUTH", "mymachine.example.com",
"su", null, "ID47", null, mapArray(),"BOM'su root' failed for lonvick on
/dev/pts/8")
+ .addRow(1065910455003L, 2, 4, "CRIT", "AUTH", "mymachine.example.com",
"su", null, "ID47", null, mapArray(),"BOM'su root' failed for lonvick on
/dev/pts/8")
+ .addRow(1061727255000L, 2, 4, "CRIT", "AUTH", "mymachine.example.com",
"su", null, "ID47", null, mapArray(),"BOM'su root' failed for lonvick on
/dev/pts/8")
+ .addRow(1061727255000L, 5, 20, "NOTICE", "LOCAL4", "192.0.2.1",
"myproc", "8710", null, null, mapArray(),"%% It's time to make the do-nuts.")
+ .build();
+
+ assertEquals(6, results.rowCount());
new RowSetComparison(expected).verifyAndClearAll(results);
}
@Test
- public void testRawQuery() throws RpcException {
+ public void testRawQuery() throws Exception {
String sql = "SELECT _raw FROM cp.`syslog/logs.syslog`";
RowSet results = client.queryBuilder().sql(sql).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("_raw", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .buildSchema();
+ .add("_raw", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su -
ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
- .addRow("<34>1 1985-04-12T19:20:50.52-04:00 mymachine.example.com
su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
- .addRow("<34>1 1985-04-12T23:20:50.52Z mymachine.example.com su -
ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
- .addRow("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su -
ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
- .addRow("<34>1 2003-08-24T05:14:15.000003-07:00
mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on
/dev/pts/8")
- .addRow("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc
8710 - - %% It's time to make the do-nuts.")
- .addRow("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com
evntslog - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\"
eventID=\"1011\"][examplePriority@32473 class=\"high\"]")
- .addRow("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com
evntslog - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\"
eventID=\"1011\"][examplePriority@32473 class=\"high\"] - and thats a wrap!")
- .build();
+ .addRow("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47
- BOM'su root' failed for lonvick on /dev/pts/8")
+ .addRow("<34>1 1985-04-12T19:20:50.52-04:00 mymachine.example.com su -
ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
+ .addRow("<34>1 1985-04-12T23:20:50.52Z mymachine.example.com su - ID47 -
BOM'su root' failed for lonvick on /dev/pts/8")
+ .addRow("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47
- BOM'su root' failed for lonvick on /dev/pts/8")
+ .addRow("<34>1 2003-08-24T05:14:15.000003-07:00 mymachine.example.com su
- ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
+ .addRow("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 8710 -
- %% It's time to make the do-nuts.")
+ .addRow("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog
- ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\"
eventID=\"1011\"][examplePriority@32473 class=\"high\"]")
+ .addRow("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog
- ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\"
eventID=\"1011\"][examplePriority@32473 class=\"high\"] - and thats a wrap!")
+ .build();
new RowSetComparison(expected).verifyAndClearAll(results);
}
@Test
- public void testStructuredDataQuery() throws RpcException {
+ public void testStructuredDataQuery() throws Exception {
String sql = "SELECT syslog_data.`structured_data`.`UserAgent` AS
UserAgent, " +
"syslog_data.`structured_data`.`UserHostAddress` AS
UserHostAddress," +
"syslog_data.`structured_data`.`BrowserSession` AS
BrowserSession," +
@@ -177,71 +189,72 @@ public class TestSyslogFormat extends ClusterTest {
RowSet results = client.queryBuilder().sql(sql).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("UserAgent", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("UserHostAddress", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("BrowserSession", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("Realm", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("Appliance", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("Company", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("UserID", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("PEN", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("HostName", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("Category", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("Priority", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .buildSchema();
+ .add("UserAgent", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("UserHostAddress", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("BrowserSession", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("Realm", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("Appliance", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("Company", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("UserID", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("PEN", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("HostName", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("Category", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("Priority", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow("Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0)
like Gecko", "192.168.2.132",
- "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0",
"secureauthqa.gosecureauth.com", "SecureAuth Corporation",
- "Tester2", "27389", "192.168.2.132", "AUDIT", "4")
- .build();
+ .addRow("Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like
Gecko", "192.168.2.132",
+ "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0",
"secureauthqa.gosecureauth.com", "SecureAuth Corporation",
+ "Tester2", "27389", "192.168.2.132", "AUDIT", "4")
+ .build();
new RowSetComparison(expected).verifyAndClearAll(results);
}
@Test
- public void testStarFlattenedStructuredDataQuery() throws RpcException {
+ public void testStarFlattenedStructuredDataQuery() throws Exception {
String sql = "SELECT * FROM cp.`syslog/test.syslog1`";
RowSet results = client.queryBuilder().sql(sql).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("event_date", TypeProtos.MinorType.TIMESTAMP,
TypeProtos.DataMode.OPTIONAL)
- .add("severity_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
- .add("facility_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
- .add("severity", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("facility", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("ip", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("app_name", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("process_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("message_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_text", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_UserAgent", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_UserHostAddress",
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_BrowserSession",
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_Realm", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_Appliance", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_Company", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_UserID", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_PEN", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_HostName", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_Category", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_Priority", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("message", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .buildSchema();
+ .add("event_date", TypeProtos.MinorType.TIMESTAMP,
TypeProtos.DataMode.OPTIONAL)
+ .add("severity_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("facility_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("severity", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("facility", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("app_name", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("process_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("message_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_text", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("message", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_UserAgent", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_UserHostAddress", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_BrowserSession", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_Realm", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_Appliance", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_Company", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_UserID", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_PEN", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_HostName", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_Category", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_Priority", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1438811939693L, 6, 10, "INFO", "AUTHPRIV",
"192.168.2.132", "SecureAuth0", "23108", "ID52020",
- "{SecureAuth@27389=[UserAgent=Mozilla/5.0 (Windows NT 6.1;
WOW64; Trident/7.0; rv:11.0) like Gecko, UserHostAddress=192.168.2.132,
BrowserSession=0gvhdi5udjuqtweprbgoxilc, Realm=SecureAuth0,
Appliance=secureauthqa.gosecureauth.com, Company=SecureAuth Corporation,
UserID=Tester2, PEN=27389, HostName=192.168.2.132, Category=AUDIT,
Priority=4]}",
- "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0)
like Gecko", "192.168.2.132",
- "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0",
"secureauthqa.gosecureauth.com", "SecureAuth Corporation",
- "Tester2", "27389", "192.168.2.132", "AUDIT", "4", "Found
the user for retrieving user's profile")
+ .addRow(1438811939693L, 6, 10, "INFO", "AUTHPRIV", "192.168.2.132",
"SecureAuth0", "23108", "ID52020", "{SecureAuth@27389=[UserAgent=Mozilla/5.0
(Windows NT 6.1; WOW64; " +
+ "Trident/7.0; rv:11.0) like Gecko, UserHostAddress=192.168.2.132,
BrowserSession=0gvhdi5udjuqtweprbgoxilc, Realm=SecureAuth0,
Appliance=secureauthqa.gosecureauth.com, " +
+ "Company=SecureAuth Corporation, UserID=Tester2, PEN=27389,
HostName=192.168.2.132, Category=AUDIT, Priority=4]}", "Found the user for
retrieving user's profile", "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0;
rv:11.0) like Gecko", "192.168.2.132", "0gvhdi5udjuqtweprbgoxilc",
"SecureAuth0", "secureauthqa.gosecureauth.com", "SecureAuth Corporation",
"Tester2", "27389", "192.168.2.132", "AUDIT", "4")
+ .addRow(1459529040580L, 6, 16, "INFO", "LOCAL0", "MacBook-Pro-3", null,
"94473", null, null,
+
"{\"pid\":94473,\"hostname\":\"MacBook-Pro-3\",\"level\":30,\"msg\":\"hello
world\",\"time\":1459529098958,\"v\":1}", null, null, null, null, null, null,
null, null, null, null, null)
.build();
+ assertEquals(2, results.rowCount());
new RowSetComparison(expected).verifyAndClearAll(results);
}
@Test
- public void testExplicitFlattenedStructuredDataQuery() throws RpcException {
+ public void testExplicitFlattenedStructuredDataQuery() throws Exception {
String sql = "SELECT event_date," +
"severity_code," +
"facility_code," +
@@ -268,39 +281,112 @@ public class TestSyslogFormat extends ClusterTest {
RowSet results = client.queryBuilder().sql(sql).rowSet();
TupleMetadata expectedSchema = new SchemaBuilder()
- .add("event_date", TypeProtos.MinorType.TIMESTAMP,
TypeProtos.DataMode.OPTIONAL)
- .add("severity_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
- .add("facility_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
- .add("severity", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("facility", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("ip", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("app_name", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("process_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("message_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_text", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_UserAgent", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_UserHostAddress",
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_BrowserSession",
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_Realm", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_Appliance", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_Company", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_UserID", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_PEN", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_HostName", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_Category", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("structured_data_Priority", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .add("message", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
- .buildSchema();
-
+ .add("event_date", TypeProtos.MinorType.TIMESTAMP,
TypeProtos.DataMode.OPTIONAL)
+ .add("severity_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("facility_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("severity", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("facility", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("app_name", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("process_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("message_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_text", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_UserAgent", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_UserHostAddress", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_BrowserSession", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_Realm", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_Appliance", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_Company", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_UserID", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_PEN", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_HostName", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_Category", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_Priority", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("message", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
+
+ assertEquals(2, results.rowCount());
RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .addRow(1438811939693L, 6, 10, "INFO", "AUTHPRIV",
"192.168.2.132", "SecureAuth0", "23108", "",
- "{SecureAuth@27389=[UserAgent=Mozilla/5.0 (Windows NT 6.1;
WOW64; Trident/7.0; rv:11.0) like Gecko, UserHostAddress=192.168.2.132,
BrowserSession=0gvhdi5udjuqtweprbgoxilc, Realm=SecureAuth0,
Appliance=secureauthqa.gosecureauth.com, Company=SecureAuth Corporation,
UserID=Tester2, PEN=27389, HostName=192.168.2.132, Category=AUDIT,
Priority=4]}",
- "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0)
like Gecko", "192.168.2.132",
- "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0",
"secureauthqa.gosecureauth.com", "SecureAuth Corporation",
- "Tester2", "27389", "192.168.2.132", "AUDIT", "4", "Found
the user for retrieving user's profile")
+ .addRow(1438811939693L, 6, 10, "INFO", "AUTHPRIV", "192.168.2.132",
"SecureAuth0", "23108", "ID52020", "{SecureAuth@27389=[UserAgent=Mozilla/5.0
(Windows NT 6.1; " +
+ "WOW64; Trident/7.0; rv:11.0) like Gecko, UserHostAddress=192.168.2.132,
BrowserSession=0gvhdi5udjuqtweprbgoxilc, Realm=SecureAuth0,
Appliance=secureauthqa.gosecureauth.com," +
+ " Company=SecureAuth Corporation, UserID=Tester2, PEN=27389,
HostName=192.168.2.132, Category=AUDIT, Priority=4]}", "Mozilla/5.0 (Windows NT
6.1; WOW64; Trident/7.0; rv:11.0) like Gecko", "192.168.2.132",
"0gvhdi5udjuqtweprbgoxilc", "SecureAuth0", "secureauthqa.gosecureauth.com",
"SecureAuth Corporation", "Tester2", "27389", "192.168.2.132", "AUDIT", "4",
"Found the user for retrieving user's profile")
+ .addRow(1459529040580L, 6, 16, "INFO", "LOCAL0", "MacBook-Pro-3", null,
"94473", null, null, null, null, null, null, null, null, null, null, null,
null, null,
+
"{\"pid\":94473,\"hostname\":\"MacBook-Pro-3\",\"level\":30,\"msg\":\"hello
world\",\"time\":1459529098958,\"v\":1}")
.build();
+ assertEquals(2, results.rowCount());
+ new RowSetComparison(expected).verifyAndClearAll(results);
+ }
+
+ @Test
+ public void testCount() throws Exception {
+ String sql = "SELECT COUNT(*) FROM cp.`syslog/logs1.syslog` ";
+ long result = client.queryBuilder().sql(sql).singletonLong();
+ assertEquals(6L, result);
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String sql = "SELECT COUNT(*) AS cnt FROM dfs.`syslog/logs1.syslog`";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+ assertEquals("Counts should match",6L, cnt);
+ }
+
+ @Test
+ public void testLimitPushdown() throws Exception {
+ String sql = "SELECT * FROM cp.`syslog/logs1.syslog` LIMIT 5";
+
+ queryBuilder()
+ .sql(sql)
+ .planMatcher()
+ .include("Limit", "maxRecords=5")
+ .match();
+ }
+
+ @Test
+ public void testNonComplexFieldsWithCompressedFile() throws Exception {
+ generateCompressedFile("syslog/logs.syslog", "zip",
"syslog/logs.syslog.zip" );
+
+ String sql = "SELECT event_date," +
+ "severity_code," +
+ "severity," +
+ "facility_code," +
+ "facility," +
+ "ip," +
+ "process_id," +
+ "message_id," +
+ "structured_data_text " +
+ "FROM dfs.`syslog/logs.syslog.zip`";
+
+ RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata expectedSchema = new SchemaBuilder()
+ .add("event_date", TypeProtos.MinorType.TIMESTAMP,
TypeProtos.DataMode.OPTIONAL)
+ .add("severity_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("severity", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("facility_code", TypeProtos.MinorType.INT,
TypeProtos.DataMode.OPTIONAL)
+ .add("facility", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+ .add("process_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("message_id", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .add("structured_data_text", TypeProtos.MinorType.VARCHAR,
TypeProtos.DataMode.OPTIONAL)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+ .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", "mymachine.example.com",
null, "ID47", null)
+ .addRow(482196050520L, 2, "CRIT", 4, "AUTH", "mymachine.example.com",
null, "ID47", null)
+ .addRow(482196050520L, 2, "CRIT", 4, "AUTH", "mymachine.example.com",
null, "ID47", null)
+ .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", "mymachine.example.com",
null, "ID47", null)
+ .addRow(1061727255000L, 2, "CRIT", 4, "AUTH", "mymachine.example.com",
null, "ID47", null)
+ .addRow(1061727255000L, 5, "NOTICE", 20, "LOCAL4", "192.0.2.1", "8710",
null, null)
+ .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4",
"mymachine.example.com", null, "ID47", "{examplePriority@32473=[class=high],
exampleSDID@32473=[iut=3, " +
+ "eventSource=Application, eventID=1011]}")
+ .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4",
"mymachine.example.com", null, "ID47", "{examplePriority@32473=[class=high],
exampleSDID@32473=[iut=3, " +
+ "eventSource=Application, eventID=1011]}")
+ .build();
+
new RowSetComparison(expected).verifyAndClearAll(results);
}
}
diff --git a/contrib/format-syslog/src/test/resources/syslog/test.syslog1
b/contrib/format-syslog/src/test/resources/syslog/test.syslog1
index d8e19d9..752bd60 100644
--- a/contrib/format-syslog/src/test/resources/syslog/test.syslog1
+++ b/contrib/format-syslog/src/test/resources/syslog/test.syslog1
@@ -1,2 +1,2 @@
<86>1 2015-08-05T21:58:59.693Z 192.168.2.132 SecureAuth0 23108 ID52020
[SecureAuth@27389 UserAgent="Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0;
rv:11.0) like Gecko" UserHostAddress="192.168.2.132"
BrowserSession="0gvhdi5udjuqtweprbgoxilc" Realm="SecureAuth0"
Appliance="secureauthqa.gosecureauth.com" Company="SecureAuth Corporation"
UserID="Tester2" PEN="27389" HostName="192.168.2.132" Category="AUDIT"
Priority="4"] Found the user for retrieving user's profile
-<134>1 2016-04-01T16:44:58Z MacBook-Pro-3 - 94473 - -
{"pid":94473,"hostname":"MacBook-Pro-3","level":30,"msg":"hello
world","time":1459529098958,"v":1}
\ No newline at end of file
+<134>1 2016-04-01T16:44:00.58Z MacBook-Pro-3 - 94473 - -
{"pid":94473,"hostname":"MacBook-Pro-3","level":30,"msg":"hello
world","time":1459529098958,"v":1}
\ No newline at end of file