This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 5d1dad7  DRILL-7532: Convert Syslog to EVF
5d1dad7 is described below

commit 5d1dad7828fe41a271e08cdb19cde84a1d78b268
Author: Charles Givre <[email protected]>
AuthorDate: Thu Dec 17 10:56:01 2020 -0500

    DRILL-7532: Convert Syslog to EVF
---
 contrib/format-syslog/pom.xml                      |   1 -
 .../drill/exec/store/syslog/SyslogBatchReader.java | 335 ++++++++++++++++++
 .../exec/store/syslog/SyslogFormatConfig.java      |   5 +-
 .../exec/store/syslog/SyslogFormatPlugin.java      | 101 +++---
 .../exec/store/syslog/SyslogRecordReader.java      | 379 ---------------------
 .../main/resources/bootstrap-format-plugins.json   |   6 +-
 .../drill/exec/store/syslog/TestSyslogFormat.java  | 360 +++++++++++--------
 .../src/test/resources/syslog/test.syslog1         |   2 +-
 8 files changed, 615 insertions(+), 574 deletions(-)

diff --git a/contrib/format-syslog/pom.xml b/contrib/format-syslog/pom.xml
index 99052f3..54e67bd 100644
--- a/contrib/format-syslog/pom.xml
+++ b/contrib/format-syslog/pom.xml
@@ -50,7 +50,6 @@
       <version>${project.version}</version>
       <scope>test</scope>
     </dependency>
-
     <dependency>
       <groupId>org.apache.drill</groupId>
       <artifactId>drill-common</artifactId>
diff --git 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
new file mode 100644
index 0000000..6dd3e55
--- /dev/null
+++ 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogBatchReader.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.syslog;
+
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.ColumnMetadata;
+import org.apache.drill.exec.record.metadata.MetadataUtils;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.drill.exec.vector.accessor.TupleWriter;
+import org.apache.hadoop.mapred.FileSplit;
+import org.joda.time.Instant;
+import org.realityforge.jsyslog.message.StructuredDataParameter;
+import org.realityforge.jsyslog.message.SyslogMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+public class SyslogBatchReader implements ManagedReader<FileSchemaNegotiator> {
+  private static final Logger logger = 
LoggerFactory.getLogger(SyslogBatchReader.class);
+  private final String STRUCTURED_DATA_PREFIX = "structured_data_";
+  private final String STRUCTURED_DATA_MAP_NAME = "structured_data";
+  private final String RAW_COLUMN_NAME = "_raw";
+
+  private final int maxRecords;
+  private final SyslogFormatConfig config;
+  private final EasySubScan subScan;
+  private final Map<String, MinorType> mappedColumns = new LinkedHashMap<>();
+  private int lineCount;
+  private int errorCount;
+  private CustomErrorContext errorContext;
+  private InputStream fsStream;
+  private FileSplit split;
+  private BufferedReader reader;
+  private RowSetLoader rowWriter;
+  private List<ScalarWriter> writerArray;
+  private ScalarWriter rawColumnWriter;
+  private ScalarWriter messageWriter;
+  private TupleWriter structuredDataWriter;
+
+
+  public SyslogBatchReader(int maxRecords, SyslogFormatConfig config, 
EasySubScan scan) {
+    this.maxRecords = maxRecords;
+    this.config = config;
+    this.subScan = scan;
+    populateMappedColumns();
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    split = negotiator.split();
+    openFile(negotiator);
+    negotiator.tableSchema(buildSchema(), false);
+    errorContext = negotiator.parentErrorContext();
+
+    ResultSetLoader loader = negotiator.build();
+    rowWriter = loader.writer();
+    writerArray = populateRowWriters();
+    rawColumnWriter = rowWriter.scalar(RAW_COLUMN_NAME);
+    messageWriter = rowWriter.scalar("message");
+    return true;
+  }
+
+  @Override
+  public boolean next() {
+    while (!rowWriter.isFull()) {
+      if (!processNextLine()) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    if (fsStream != null) {
+      AutoCloseables.closeSilently(fsStream);
+      fsStream = null;
+    }
+
+    if (reader != null) {
+      AutoCloseables.closeSilently(reader);
+      reader = null;
+    }
+  }
+
+  private void openFile(FileSchemaNegotiator negotiator) {
+    try {
+      fsStream = 
negotiator.fileSystem().openPossiblyCompressedStream(split.getPath());
+    } catch (IOException e) {
+      throw UserException
+        .dataReadError(e)
+        .message("Unable to open Syslog File %s", split.getPath())
+        .addContext(e.getMessage())
+        .addContext(errorContext)
+        .build(logger);
+    }
+    this.lineCount = 0;
+    reader = new BufferedReader(new InputStreamReader(fsStream));
+  }
+
+  public TupleMetadata buildSchema() {
+    SchemaBuilder builder = new SchemaBuilder();
+    for (Map.Entry<String, MinorType> entry : mappedColumns.entrySet()) {
+      builder.addNullable(entry.getKey(), entry.getValue());
+    }
+    if (! config.flattenStructuredData()) {
+      ColumnMetadata structuredDataMap = 
MetadataUtils.newMap(STRUCTURED_DATA_MAP_NAME);
+      builder.add(structuredDataMap);
+    }
+
+    builder.addNullable("message", MinorType.VARCHAR);
+
+    // Add _raw column
+    ColumnMetadata colSchema = MetadataUtils.newScalar(RAW_COLUMN_NAME, 
MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL);
+    colSchema.setBooleanProperty(ColumnMetadata.EXCLUDE_FROM_WILDCARD, true);
+    builder.add(colSchema);
+    return builder.buildSchema();
+  }
+
+  private List<ScalarWriter> populateRowWriters() {
+    List<ScalarWriter> writerArray = new ArrayList<>();
+    for (Map.Entry<String, MinorType> entry : mappedColumns.entrySet()) {
+      writerArray.add(rowWriter.scalar(entry.getKey()));
+    }
+
+    if (! config.flattenStructuredData()) {
+       structuredDataWriter = rowWriter.tuple(STRUCTURED_DATA_MAP_NAME);
+    }
+
+    return writerArray;
+  }
+
+  private void populateMappedColumns() {
+    mappedColumns.put("event_date", MinorType.TIMESTAMP);
+    mappedColumns.put("severity_code", MinorType.INT);
+    mappedColumns.put("facility_code", MinorType.INT);
+    mappedColumns.put("severity", MinorType.VARCHAR);
+    mappedColumns.put("facility", MinorType.VARCHAR);
+    mappedColumns.put("ip", MinorType.VARCHAR);
+    mappedColumns.put("app_name", MinorType.VARCHAR);
+    mappedColumns.put("process_id", MinorType.VARCHAR);
+    mappedColumns.put("message_id", MinorType.VARCHAR);
+    mappedColumns.put("structured_data_text", MinorType.VARCHAR);
+  }
+
+  private boolean processNextLine() {
+    // Check to see if the limit has been reached
+    if (rowWriter.limitReached(maxRecords)) {
+      return false;
+    }
+
+    String line;
+    try {
+      line = reader.readLine();
+
+      // If the line is empty, return false
+      if (line == null) {
+        return false;
+      }
+
+      // Remove leading and trailing whitespace
+      line = line.trim();
+      if (line.length() == 0) {
+        // Skip empty lines
+        return true;
+      }
+
+      SyslogMessage parsedMessage = 
SyslogMessage.parseStructuredSyslogMessage(line);
+      rowWriter.start();
+      writeStructuredColumns(parsedMessage);
+      writeStructuredData(parsedMessage);
+
+      if (isProjected(rawColumnWriter)) {
+        rawColumnWriter.setString(line);
+      }
+
+      if (isProjected(messageWriter)) {
+        logger.debug("Message: {}", parsedMessage.getMessage());
+        messageWriter.setString(parsedMessage.getMessage());
+      }
+
+    } catch (IOException e) {
+      errorCount++;
+      if (errorCount > config.getMaxErrors()) {
+        throw UserException
+          .dataReadError()
+          .message("Maximum Error Threshold Exceeded. Error reading Syslog 
file at line %d", lineCount)
+          .addContext(e.getMessage())
+          .build(logger);
+      }
+    }
+    lineCount++;
+    rowWriter.save();
+    return true;
+  }
+
+  private void writeStructuredColumns(SyslogMessage parsedMessage) {
+    long milliseconds = parsedMessage.getTimestamp().getMillis();
+    writerArray.get(0).setTimestamp(new Instant(milliseconds));
+    writerArray.get(1).setInt(parsedMessage.getLevel().ordinal());
+    writerArray.get(2).setInt(parsedMessage.getFacility().ordinal());
+    setString(writerArray.get(3), parsedMessage.getLevel().name());
+    setString(writerArray.get(4), parsedMessage.getFacility().name());
+    setString(writerArray.get(5), parsedMessage.getHostname());
+    setString(writerArray.get(6), parsedMessage.getAppName());
+    setString(writerArray.get(7), parsedMessage.getProcId());
+    setString(writerArray.get(8), parsedMessage.getMsgId());
+
+    Map<String, List<StructuredDataParameter>> structuredData = 
parsedMessage.getStructuredData();
+
+    if (structuredData != null) {
+      
writerArray.get(9).setString(parsedMessage.getStructuredData().toString());
+    }
+    logger.debug("Successfully mapped known fields");
+  }
+
+  /**
+   * Write the flattened structured data fields to Drill vectors. The data in 
the structured fields is not known in
+   * advance and also is not consistent between syslog entries, so we have to 
add these fields on the fly.  The only possible
+   * data type in these cases are VARCHARs.
+   * @param parsedMessage The parsed syslog message
+   */
+  private void writeStructuredData(SyslogMessage parsedMessage) {
+    Map<String, List<StructuredDataParameter>> structuredData = 
parsedMessage.getStructuredData();
+    // Prevent NPE if there is no structured data text
+    if (structuredData == null) {
+      return;
+    }
+
+    if (config.flattenStructuredData()) {
+      // Iterate over the structured data fields and map to Drill vectors
+      for (Map.Entry<String, List<StructuredDataParameter>> entry : 
structuredData.entrySet()) {
+        for (StructuredDataParameter parameter : entry.getValue()) {
+          // These fields are not known in advance and are not necessarily 
consistent
+          String fieldName = STRUCTURED_DATA_PREFIX + parameter.getName();
+          String fieldValue = parameter.getValue();
+          writeStringColumn(rowWriter, fieldName, fieldValue);
+          logger.debug("Writing {} {}", fieldName, fieldValue);
+        }
+      }
+    } else {
+      writeStructuredDataToMap(structuredData);
+    }
+  }
+
+  private void writeStructuredDataToMap(Map<String, 
List<StructuredDataParameter>> structuredData) {
+    // Iterate over the structured data fields and write to a Drill map
+    for (Map.Entry<String, List<StructuredDataParameter>> entry : 
structuredData.entrySet()) {
+      for (StructuredDataParameter parameter : entry.getValue()) {
+        // These fields are not known in advance and are not necessarily 
consistent
+        String fieldName = parameter.getName();
+        String fieldValue = parameter.getValue();
+        writeStringColumn(structuredDataWriter, fieldName, fieldValue);
+      }
+    }
+  }
+
+  /**
+   * Writes data to a String column.  If there is no ScalarWriter for the 
particular column, this function will create one.
+   * @param rowWriter The ScalarWriter to which we are writing
+   * @param name The field name to be written
+   * @param value The field value to be written
+   */
+  private void writeStringColumn(TupleWriter rowWriter, String name, String 
value) {
+    ScalarWriter colWriter = getColWriter(rowWriter, name, 
TypeProtos.MinorType.VARCHAR);
+    colWriter.setString(value);
+  }
+
+  private ScalarWriter getColWriter(TupleWriter tupleWriter, String fieldName, 
TypeProtos.MinorType type) {
+    int index = tupleWriter.tupleSchema().index(fieldName);
+    if (index == -1) {
+      ColumnMetadata colSchema = MetadataUtils.newScalar(fieldName, type, 
TypeProtos.DataMode.OPTIONAL);
+      index = tupleWriter.addColumn(colSchema);
+    }
+    return tupleWriter.scalar(index);
+  }
+
+  /**
+   * The ScalarWriter objects have method to verify whether the writer is 
projected or not, however it does not
+   * seem to take the star queries into account.  This method checks to see if 
the query is a star query and includes that
+   * in the determination of whether the column is projected or not.
+   * @param writer A scalarWriter
+   * @return True if the column is projected, false if not.
+   */
+  private boolean isProjected(ScalarWriter writer) {
+    // Case for star query
+    if (subScan.getColumns().size() == 1 && 
subScan.getColumns().get(0).isDynamicStar()) {
+      return true;
+    } else {
+      return writer.isProjected();
+    }
+  }
+
+  private void setString(ScalarWriter writer, String value) {
+    if (value == null) {
+      return;
+    }
+    writer.setString(value);
+  }
+}
diff --git 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java
 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java
index e851e31..8f2fc3f 100644
--- 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java
+++ 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatConfig.java
@@ -49,14 +49,17 @@ public class SyslogFormatConfig implements 
FormatPluginConfig {
     this.flattenStructuredData = flattenStructuredData == null ? false : 
flattenStructuredData;
   }
 
-  public boolean getFlattenStructuredData() {
+  @JsonProperty("flattenStructuredData")
+  public boolean flattenStructuredData() {
     return flattenStructuredData;
   }
 
+  @JsonProperty("maxErrors")
   public int getMaxErrors() {
     return maxErrors;
   }
 
+  @JsonProperty("extensions")
   public List<String> getExtensions() {
     return extensions;
   }
diff --git 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
index d21035b..88b53d1 100644
--- 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
+++ 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogFormatPlugin.java
@@ -18,82 +18,77 @@
 
 package org.apache.drill.exec.store.syslog;
 
-import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.planner.common.DrillStatsTable.TableStatistics;
+import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.Types;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
 import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.util.List;
 
 public class SyslogFormatPlugin extends EasyFormatPlugin<SyslogFormatConfig> {
 
   public static final String DEFAULT_NAME = "syslog";
-  private final SyslogFormatConfig formatConfig;
 
-  public SyslogFormatPlugin(String name, DrillbitContext context,
-                            Configuration fsConf, StoragePluginConfig 
storageConfig,
-                            SyslogFormatConfig formatConfig) {
-    super(name, context, fsConf, storageConfig, formatConfig,
-            true,  // readable
-            false, // writable
-            true, // blockSplittable
-            true,  // compressible
-            Lists.newArrayList(formatConfig.getExtensions()),
-            DEFAULT_NAME);
-    this.formatConfig = formatConfig;
-  }
+  private static class SyslogReaderFactory extends FileReaderFactory {
 
-  @Override
-  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem 
dfs, FileWork fileWork,
-                                      List<SchemaPath> columns, String 
userName) {
-    return new SyslogRecordReader(context, dfs, fileWork, columns, userName, 
formatConfig);
-  }
+    private final int maxRecords;
+    private final SyslogFormatConfig formatConfig;
+    private final EasySubScan scan;
 
-  @Override
-  public boolean supportsPushDown() {
-    return true;
-  }
+    public SyslogReaderFactory(int maxRecords, SyslogFormatConfig 
formatConfig, EasySubScan scan) {
+      this.maxRecords = maxRecords;
+      this.formatConfig = formatConfig;
+      this.scan = scan;
+    }
 
-  @Override
-  public RecordWriter getRecordWriter(FragmentContext context,
-                                      EasyWriter writer) throws 
UnsupportedOperationException {
-    throw new UnsupportedOperationException("Drill does not support writing 
records to Syslog format.");
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      return new SyslogBatchReader(maxRecords, formatConfig, scan);
+    }
   }
 
-  @Override
-  public int getReaderOperatorType() {
-    return CoreOperatorType.SYSLOG_SUB_SCAN_VALUE;
+  public SyslogFormatPlugin(String name, DrillbitContext context,
+                          Configuration fsConf, StoragePluginConfig 
storageConfig,
+                          SyslogFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, 
formatConfig);
   }
 
-  @Override
-  public int getWriterOperatorType() {
-    throw new UnsupportedOperationException("Drill does not support writing 
records to Syslog format.");
+  private static EasyFormatConfig easyConfig(Configuration fsConf, 
SyslogFormatConfig pluginConfig) {
+    EasyFormatConfig config = new EasyFormatConfig();
+    config.readable = true;
+    config.writable = false;
+    config.blockSplittable = false;
+    config.compressible = true;
+    config.supportsProjectPushdown = true;
+    config.extensions = pluginConfig.getExtensions();
+    config.fsConf = fsConf;
+    config.defaultName = DEFAULT_NAME;
+    config.readerOperatorType = CoreOperatorType.SYSLOG_SUB_SCAN_VALUE;
+    config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
+    return config;
   }
 
   @Override
-  public boolean supportsStatistics() {
-    return false;
+  public ManagedReader<? extends FileSchemaNegotiator> newBatchReader(
+    EasySubScan scan, OptionManager options)  {
+    return new SyslogBatchReader(scan.getMaxRecords(), formatConfig, scan);
   }
 
   @Override
-  public TableStatistics readStatistics(FileSystem fs, Path statsTablePath) {
-    throw new UnsupportedOperationException("unimplemented");
-  }
+  protected FileScanBuilder frameworkBuilder(OptionManager options, 
EasySubScan scan) {
+    FileScanBuilder builder = new FileScanBuilder();
+    builder.setReaderFactory(new SyslogReaderFactory(scan.getMaxRecords(), 
formatConfig, scan));
 
-  @Override
-  public void writeStatistics(TableStatistics statistics, FileSystem fs, Path 
statsTablePath) {
-    throw new UnsupportedOperationException("unimplemented");
+    initScanBuilder(builder, scan);
+    builder.nullType(Types.optional(TypeProtos.MinorType.VARCHAR));
+    return builder;
   }
 }
diff --git 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
 
b/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
deleted file mode 100644
index 0f39887..0000000
--- 
a/contrib/format-syslog/src/main/java/org/apache/drill/exec/store/syslog/SyslogRecordReader.java
+++ /dev/null
@@ -1,379 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.drill.exec.store.syslog;
-
-import io.netty.buffer.DrillBuf;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.exec.exception.OutOfMemoryException;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
-import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
-import org.apache.drill.exec.vector.complex.writer.BaseWriter;
-import org.realityforge.jsyslog.message.StructuredDataParameter;
-import org.realityforge.jsyslog.message.SyslogMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.BufferedReader;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.text.SimpleDateFormat;
-import java.util.List;
-import java.util.Map;
-
-public class SyslogRecordReader extends AbstractRecordReader {
-
-  private static final Logger logger = 
LoggerFactory.getLogger(SyslogRecordReader.class);
-  private static final int MAX_RECORDS_PER_BATCH = 4096;
-
-  private final DrillFileSystem fileSystem;
-  private final FileWork fileWork;
-  private final String userName;
-  private BufferedReader reader;
-  private DrillBuf buffer;
-  private VectorContainerWriter writer;
-  private final int maxErrors;
-  private final boolean flattenStructuredData;
-  private int errorCount;
-  private int lineCount;
-  private final List<SchemaPath> projectedColumns;
-  private String line;
-
-  public SyslogRecordReader(FragmentContext context,
-                            DrillFileSystem fileSystem,
-                            FileWork fileWork,
-                            List<SchemaPath> columns,
-                            String userName,
-                            SyslogFormatConfig config) throws 
OutOfMemoryException {
-
-    this.fileSystem = fileSystem;
-    this.fileWork = fileWork;
-    this.userName = userName;
-    this.maxErrors = config.getMaxErrors();
-    this.errorCount = 0;
-    this.buffer = context.getManagedBuffer().reallocIfNeeded(4096);
-    this.projectedColumns = columns;
-    this.flattenStructuredData = config.getFlattenStructuredData();
-
-    setColumns(columns);
-  }
-
-  @Override
-  public void setup(final OperatorContext context, final OutputMutator output) 
{
-    openFile();
-    this.writer = new VectorContainerWriter(output);
-  }
-
-  private void openFile() {
-    InputStream in;
-    try {
-      in = fileSystem.openPossiblyCompressedStream(fileWork.getPath());
-    } catch (Exception e) {
-      throw UserException
-              .dataReadError(e)
-              .message("Failed to open open input file: %s", 
fileWork.getPath())
-              .addContext("User name", this.userName)
-              .build(logger);
-    }
-    this.lineCount = 0;
-    reader = new BufferedReader(new InputStreamReader(in));
-  }
-
-  @Override
-  public int next() {
-    this.writer.allocate();
-    this.writer.reset();
-
-    int recordCount = 0;
-
-    try {
-      BaseWriter.MapWriter map = this.writer.rootAsMap();
-      String line;
-
-      while (recordCount < MAX_RECORDS_PER_BATCH && (line = 
this.reader.readLine()) != null) {
-        lineCount++;
-
-        // Skip empty lines
-        line = line.trim();
-        if (line.length() == 0) {
-          continue;
-        }
-        this.line = line;
-
-        try {
-          SyslogMessage parsedMessage = 
SyslogMessage.parseStructuredSyslogMessage(line);
-
-          this.writer.setPosition(recordCount);
-          map.start();
-
-          if (isStarQuery()) {
-            writeAllColumns(map, parsedMessage);
-          } else {
-            writeProjectedColumns(map, parsedMessage);
-          }
-          map.end();
-          recordCount++;
-
-        } catch (Exception e) {
-          errorCount++;
-          if (errorCount > maxErrors) {
-            throw UserException
-                    .dataReadError()
-                    .message("Maximum Error Threshold Exceeded: ")
-                    .addContext("Line: " + lineCount)
-                    .addContext(e.getMessage())
-                    .build(logger);
-          }
-        }
-      }
-
-      this.writer.setValueCount(recordCount);
-      return recordCount;
-
-    } catch (final Exception e) {
-      errorCount++;
-      if (errorCount > maxErrors) {
-        throw UserException.dataReadError()
-                .message("Error parsing file")
-                .addContext(e.getMessage())
-                .build(logger);
-      }
-    }
-
-    return recordCount;
-  }
-
-  private void writeAllColumns(BaseWriter.MapWriter map, SyslogMessage 
parsedMessage) {
-
-    long milliseconds = 0;
-    try {
-      milliseconds = parsedMessage.getTimestamp().getMillis();
-    } catch (final Exception e) {
-      errorCount++;
-      if (errorCount > maxErrors) {
-        throw UserException.dataReadError()
-                .message("Syslog Format Plugin: Error parsing date")
-                .addContext(e.getMessage())
-                .build(logger);
-      }
-    }
-    map.timeStamp("event_date").writeTimeStamp(milliseconds);
-    map.integer("severity_code").writeInt(parsedMessage.getLevel().ordinal());
-    
map.integer("facility_code").writeInt(parsedMessage.getFacility().ordinal());
-
-    mapStringField("severity", parsedMessage.getLevel().name(), map);
-    mapStringField("facility", parsedMessage.getFacility().name(), map);
-    mapStringField("ip", parsedMessage.getHostname(), map);
-    mapStringField("app_name", parsedMessage.getAppName(), map);
-    mapStringField("process_id", parsedMessage.getProcId(), map);
-    mapStringField("message_id", parsedMessage.getMsgId(), map);
-
-    if (parsedMessage.getStructuredData() != null) {
-      mapStringField("structured_data_text", 
parsedMessage.getStructuredData().toString(), map);
-      Map<String, List<StructuredDataParameter>> structuredData = 
parsedMessage.getStructuredData();
-      if (flattenStructuredData) {
-        mapFlattenedStructuredData(structuredData, map);
-      } else {
-        mapComplexField("structured_data", structuredData, map);
-      }
-    }
-    mapStringField("message", parsedMessage.getMessage(), map);
-  }
-
-  private void writeProjectedColumns(BaseWriter.MapWriter map, SyslogMessage 
parsedMessage) throws UserException {
-    String columnName;
-
-    for (SchemaPath col : projectedColumns) {
-
-      //Case for nested fields
-      if (col.getAsNamePart().hasChild()) {
-        String fieldName = col.getAsNamePart().getChild().getName();
-        mapStructuredDataField(fieldName, map, parsedMessage);
-      } else {
-        columnName = col.getAsNamePart().getName();
-
-        //Extracts fields from structured data IF the user selected to flatten 
these fields
-        if ((!columnName.equals("structured_data_text")) && 
columnName.startsWith("structured_data_")) {
-          String fieldName = columnName.replace("structured_data_", "");
-          String value = getFieldFromStructuredData(fieldName, parsedMessage);
-          mapStringField(columnName, value, map);
-        } else {
-          switch (columnName) {
-            case "event_date":
-              long milliseconds = parsedMessage.getTimestamp().getMillis(); 
//TODO put in try/catch
-              map.timeStamp("event_date").writeTimeStamp(milliseconds);
-              break;
-            case "severity_code":
-              
map.integer("severity_code").writeInt(parsedMessage.getLevel().ordinal());
-              break;
-            case "facility_code":
-              
map.integer("facility_code").writeInt(parsedMessage.getFacility().ordinal());
-              break;
-            case "severity":
-              mapStringField("severity", parsedMessage.getLevel().name(), map);
-              break;
-            case "facility":
-              mapStringField("facility", parsedMessage.getFacility().name(), 
map);
-              break;
-            case "ip":
-              mapStringField("ip", parsedMessage.getHostname(), map);
-              break;
-            case "app_name":
-              mapStringField("app_name", parsedMessage.getAppName(), map);
-              break;
-            case "process_id":
-              mapStringField("process_id", parsedMessage.getProcId(), map);
-              break;
-            case "msg_id":
-              mapStringField("message_id", parsedMessage.getMsgId(), map);
-              break;
-            case "structured_data":
-              if (parsedMessage.getStructuredData() != null) {
-                Map<String, List<StructuredDataParameter>> structured_data = 
parsedMessage.getStructuredData();
-                mapComplexField("structured_data", structured_data, map);
-              }
-              break;
-            case "structured_data_text":
-              if (parsedMessage.getStructuredData() != null) {
-                mapStringField("structured_data_text", 
parsedMessage.getStructuredData().toString(), map);
-              } else {
-                mapStringField("structured_data_text", "", map);
-              }
-              break;
-            case "message":
-              mapStringField("message", parsedMessage.getMessage(), map);
-              break;
-            case "_raw":
-              mapStringField("_raw", this.line, map);
-              break;
-
-            default:
-              mapStringField(columnName, "", map);
-          }
-        }
-      }
-    }
-  }
-
-  //Helper function to map strings
-  private void mapStringField(String name, String value, BaseWriter.MapWriter 
map) {
-    if (value == null) {
-      return;
-    }
-    try {
-      byte[] bytes = value.getBytes(StandardCharsets.UTF_8);
-      int stringLength = bytes.length;
-      this.buffer = buffer.reallocIfNeeded(stringLength);
-      this.buffer.setBytes(0, bytes, 0, stringLength);
-      map.varChar(name).writeVarChar(0, stringLength, buffer);
-    } catch (Exception e) {
-      throw UserException
-              .dataWriteError()
-              .addContext("Could not write string: ")
-              .addContext(e.getMessage())
-              .build(logger);
-    }
-  }
-
-  //Helper function to flatten structured data
-  private void mapFlattenedStructuredData(Map<String, 
List<StructuredDataParameter>> data, BaseWriter.MapWriter map) {
-    for (Map.Entry<String, List<StructuredDataParameter>> entry : 
data.entrySet()) {
-      for (StructuredDataParameter parameter : entry.getValue()) {
-        String fieldName = "structured_data_" + parameter.getName();
-        String fieldValue = parameter.getValue();
-        mapStringField(fieldName, fieldValue, map);
-      }
-    }
-  }
-
-  //Gets field from the Structured Data Construct
-  private String getFieldFromStructuredData(String fieldName, SyslogMessage 
parsedMessage) {
-    for (Map.Entry<String, List<StructuredDataParameter>> entry : 
parsedMessage.getStructuredData().entrySet()) {
-      for (StructuredDataParameter d : entry.getValue()) {
-        if (d.getName().equals(fieldName)) {
-          return d.getValue();
-        }
-      }
-    }
-    return null;
-  }
-
-  //Helper function to map arrays
-  private void mapComplexField(String mapName, Map<String, 
List<StructuredDataParameter>> data, BaseWriter.MapWriter map) {
-    for (Map.Entry<String, List<StructuredDataParameter>> entry : 
data.entrySet()) {
-      List<StructuredDataParameter> dataParameters = entry.getValue();
-      String fieldName;
-      String fieldValue;
-
-      for (StructuredDataParameter parameter : dataParameters) {
-        fieldName = parameter.getName();
-        fieldValue = parameter.getValue();
-
-        VarCharHolder rowHolder = new VarCharHolder();
-
-        byte[] rowStringBytes = fieldValue.getBytes();
-        this.buffer.reallocIfNeeded(rowStringBytes.length);
-        this.buffer.setBytes(0, rowStringBytes);
-        rowHolder.start = 0;
-        rowHolder.end = rowStringBytes.length;
-        rowHolder.buffer = this.buffer;
-
-        map.map(mapName).varChar(fieldName).write(rowHolder);
-      }
-    }
-  }
-
-  private void mapStructuredDataField(String fieldName, BaseWriter.MapWriter 
map, SyslogMessage parsedMessage) {
-    String fieldValue = getFieldFromStructuredData(fieldName, parsedMessage);
-    VarCharHolder rowHolder = new VarCharHolder();
-
-    byte[] rowStringBytes = fieldValue.getBytes();
-    this.buffer.reallocIfNeeded(rowStringBytes.length);
-    this.buffer.setBytes(0, rowStringBytes);
-    rowHolder.start = 0;
-    rowHolder.end = rowStringBytes.length;
-    rowHolder.buffer = this.buffer;
-
-    map.map("structured_data").varChar(fieldName).write(rowHolder);
-  }
-
-  public SimpleDateFormat getValidDateObject(String d) {
-    SimpleDateFormat tempDateFormat;
-    if (d != null && !d.isEmpty()) {
-      tempDateFormat = new SimpleDateFormat(d);
-    } else {
-      throw UserException
-              .parseError()
-              .message("Invalid date format")
-              .build(logger);
-    }
-    return tempDateFormat;
-  }
-
-  public void close() throws Exception {
-    this.reader.close();
-  }
-}
diff --git 
a/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json 
b/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
index ee5a396..ea55012 100644
--- a/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
+++ b/contrib/format-syslog/src/main/resources/bootstrap-format-plugins.json
@@ -7,7 +7,8 @@
           "type": "syslog",
           "extensions": [
             "syslog"
-          ]
+          ],
+          "maxErrors" : 0
         }
       }
     },
@@ -18,7 +19,8 @@
           "type": "syslog",
           "extensions": [
             "syslog"
-          ]
+          ],
+          "maxErrors" : 0
         }
       }
     }
diff --git 
a/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
 
b/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
index c7bd833..4fb15c5 100644
--- 
a/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
+++ 
b/contrib/format-syslog/src/test/java/org/apache/drill/exec/store/syslog/TestSyslogFormat.java
@@ -17,17 +17,19 @@
  */
 package org.apache.drill.exec.store.syslog;
 
+import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.logical.FormatPluginConfig;
 import org.apache.drill.common.types.TypeProtos;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.metadata.TupleMetadata;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.test.ClusterTest;
-import org.apache.drill.test.BaseDirTestWatcher;
 import org.apache.drill.exec.physical.rowSet.RowSet;
 import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
 import org.apache.drill.test.ClusterFixture;
@@ -35,20 +37,26 @@ import org.apache.drill.test.rowSet.RowSetComparison;
 import org.apache.drill.exec.record.metadata.SchemaBuilder;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
 
-public class TestSyslogFormat extends ClusterTest {
+import static org.apache.drill.test.QueryTestUtil.generateCompressedFile;
+import static org.apache.drill.test.rowSet.RowSetUtilities.mapArray;
+import static org.junit.Assert.assertEquals;
 
-  @ClassRule
-  public static final BaseDirTestWatcher dirTestWatcher = new 
BaseDirTestWatcher();
+@Category(RowSetTests.class)
+public class TestSyslogFormat extends ClusterTest {
 
   @BeforeClass
   public static void setup() throws Exception {
-    
ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+
+    // Needed for compressed file unit test
+    dirTestWatcher.copyResourceToRoot(Paths.get("syslog/"));
+
     defineSyslogPlugin();
   }
 
-  private static void defineSyslogPlugin() throws ExecutionSetupException {
+  private static void defineSyslogPlugin() {
     Map<String, FormatPluginConfig> formats = new HashMap<>();
     formats.put("sample", new SyslogFormatConfig(
         Collections.singletonList("syslog"), null, null));
@@ -58,6 +66,7 @@ public class TestSyslogFormat extends ClusterTest {
 
     // Define a temporary plugin for the "cp" storage plugin.
     cluster.defineFormats("cp", formats);
+    cluster.defineFormats("dfs", formats);
   }
 
   @Test
@@ -76,88 +85,91 @@ public class TestSyslogFormat extends ClusterTest {
     RowSet results = client.queryBuilder().sql(sql).rowSet();
 
     TupleMetadata expectedSchema = new SchemaBuilder()
-            .add("event_date", TypeProtos.MinorType.TIMESTAMP, 
TypeProtos.DataMode.OPTIONAL)
-            .add("severity_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
-            .add("severity", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("facility_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
-            .add("facility", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("ip", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("process_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("message_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_text", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .buildSchema();
+      .add("event_date", TypeProtos.MinorType.TIMESTAMP, 
TypeProtos.DataMode.OPTIONAL)
+      .add("severity_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
+      .add("severity", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("facility_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
+      .add("facility", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("process_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("message_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_text", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-            .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", 
"mymachine.example.com", null, "", "")
-            .addRow(482196050520L, 2, "CRIT", 4, "AUTH", 
"mymachine.example.com", null, "", "")
-            .addRow(482196050520L, 2, "CRIT", 4, "AUTH", 
"mymachine.example.com", null, "", "")
-            .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", 
"mymachine.example.com", null, "", "")
-            .addRow(1061727255000L, 2, "CRIT", 4, "AUTH", 
"mymachine.example.com", null, "", "")
-            .addRow(1061727255000L, 5, "NOTICE", 20, "LOCAL4", "192.0.2.1", 
"8710", "", "")
-            .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4", 
"mymachine.example.com", null, "", "{examplePriority@32473=[class=high], 
exampleSDID@32473=[iut=3, eventSource=Application, eventID=1011]}")
-            .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4", 
"mymachine.example.com", null, "", "{examplePriority@32473=[class=high], 
exampleSDID@32473=[iut=3, eventSource=Application, eventID=1011]}")
-            .build();
+      .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", 
null, "ID47", null)
+      .addRow(482196050520L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", 
null, "ID47", null)
+      .addRow(482196050520L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", 
null, "ID47", null)
+      .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", 
null, "ID47", null)
+      .addRow(1061727255000L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", 
null, "ID47", null)
+      .addRow(1061727255000L, 5, "NOTICE", 20, "LOCAL4", "192.0.2.1", "8710", 
null, null)
+      .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4", 
"mymachine.example.com", null, "ID47", "{examplePriority@32473=[class=high], 
exampleSDID@32473=[iut=3, " +
+    "eventSource=Application, eventID=1011]}")
+      .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4", 
"mymachine.example.com", null, "ID47", "{examplePriority@32473=[class=high], 
exampleSDID@32473=[iut=3, " +
+    "eventSource=Application, eventID=1011]}")
+      .build();
 
     new RowSetComparison(expected).verifyAndClearAll(results);
   }
 
   @Test
-  public void testStarQuery() throws RpcException {
+  public void testStarQuery() throws Exception {
     String sql = "SELECT * FROM cp.`syslog/logs1.syslog`";
 
-
     RowSet results = client.queryBuilder().sql(sql).rowSet();
-
     TupleMetadata expectedSchema = new SchemaBuilder()
-            .add("event_date", TypeProtos.MinorType.TIMESTAMP, 
TypeProtos.DataMode.OPTIONAL)
-            .add("severity_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
-            .add("facility_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
-            .add("severity", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("facility", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("ip", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("app_name", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("message_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("message", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("process_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .buildSchema();
+      .add("event_date", TypeProtos.MinorType.TIMESTAMP, 
TypeProtos.DataMode.OPTIONAL)
+      .add("severity_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
+      .add("facility_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
+      .add("severity", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("facility", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("app_name", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("process_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("message_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_text", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data", MinorType.MAP, DataMode.REQUIRED)
+      .add("message", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-            .addRow(1065910455003L, 2, 4, "CRIT", "AUTH", 
"mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on 
/dev/pts/8", null)
-            .addRow(482196050520L, 2, 4, "CRIT", "AUTH", 
"mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on 
/dev/pts/8", null)
-            .addRow(482196050520L, 2, 4, "CRIT", "AUTH", 
"mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on 
/dev/pts/8", null)
-            .addRow(1065910455003L, 2, 4, "CRIT", "AUTH", 
"mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on 
/dev/pts/8", null)
-            .addRow(1061727255000L, 2, 4, "CRIT", "AUTH", 
"mymachine.example.com", "su", "ID47", "BOM'su root' failed for lonvick on 
/dev/pts/8", null)
-            .addRow(1061727255000L, 5, 20, "NOTICE", "LOCAL4", "192.0.2.1", 
"myproc", null, "%% It's time to make the do-nuts.", "8710")
-            .build();
-
+      .addRow(1065910455003L, 2, 4, "CRIT", "AUTH", "mymachine.example.com", 
"su", null, "ID47", null, mapArray(), "BOM'su root' failed for lonvick on 
/dev/pts/8")
+      .addRow(482196050520L, 2, 4, "CRIT", "AUTH", "mymachine.example.com", 
"su", null, "ID47", null, mapArray(),"BOM'su root' failed for lonvick on 
/dev/pts/8")
+      .addRow(482196050520L, 2, 4, "CRIT", "AUTH", "mymachine.example.com", 
"su", null, "ID47", null, mapArray(),"BOM'su root' failed for lonvick on 
/dev/pts/8")
+      .addRow(1065910455003L, 2, 4, "CRIT", "AUTH", "mymachine.example.com", 
"su", null,  "ID47", null, mapArray(),"BOM'su root' failed for lonvick on 
/dev/pts/8")
+      .addRow(1061727255000L, 2, 4, "CRIT", "AUTH", "mymachine.example.com", 
"su", null, "ID47", null, mapArray(),"BOM'su root' failed for lonvick on 
/dev/pts/8")
+      .addRow(1061727255000L, 5, 20, "NOTICE", "LOCAL4", "192.0.2.1", 
"myproc", "8710", null, null, mapArray(),"%% It's time to make the do-nuts.")
+      .build();
+
+    assertEquals(6, results.rowCount());
     new RowSetComparison(expected).verifyAndClearAll(results);
   }
 
   @Test
-  public void testRawQuery() throws RpcException {
+  public void testRawQuery() throws Exception {
     String sql = "SELECT _raw FROM cp.`syslog/logs.syslog`";
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
     TupleMetadata expectedSchema = new SchemaBuilder()
-            .add("_raw", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .buildSchema();
+      .add("_raw", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-            .addRow("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - 
ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
-            .addRow("<34>1 1985-04-12T19:20:50.52-04:00 mymachine.example.com 
su - ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
-            .addRow("<34>1 1985-04-12T23:20:50.52Z mymachine.example.com su - 
ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
-            .addRow("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - 
ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
-            .addRow("<34>1 2003-08-24T05:14:15.000003-07:00 
mymachine.example.com su - ID47 - BOM'su root' failed for lonvick on 
/dev/pts/8")
-            .addRow("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 
8710 - - %% It's time to make the do-nuts.")
-            .addRow("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com 
evntslog - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" 
eventID=\"1011\"][examplePriority@32473 class=\"high\"]")
-            .addRow("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com 
evntslog - ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" 
eventID=\"1011\"][examplePriority@32473 class=\"high\"] - and thats a wrap!")
-            .build();
+      .addRow("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 
- BOM'su root' failed for lonvick on /dev/pts/8")
+      .addRow("<34>1 1985-04-12T19:20:50.52-04:00 mymachine.example.com su - 
ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
+      .addRow("<34>1 1985-04-12T23:20:50.52Z mymachine.example.com su - ID47 - 
BOM'su root' failed for lonvick on /dev/pts/8")
+      .addRow("<34>1 2003-10-11T22:14:15.003Z mymachine.example.com su - ID47 
- BOM'su root' failed for lonvick on /dev/pts/8")
+      .addRow("<34>1 2003-08-24T05:14:15.000003-07:00 mymachine.example.com su 
- ID47 - BOM'su root' failed for lonvick on /dev/pts/8")
+      .addRow("<165>1 2003-08-24T05:14:15.000003-07:00 192.0.2.1 myproc 8710 - 
- %% It's time to make the do-nuts.")
+      .addRow("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog 
- ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" 
eventID=\"1011\"][examplePriority@32473 class=\"high\"]")
+      .addRow("<165>1 2003-10-11T22:14:15.003Z mymachine.example.com evntslog 
- ID47 [exampleSDID@32473 iut=\"3\" eventSource=\"Application\" 
eventID=\"1011\"][examplePriority@32473 class=\"high\"] - and thats a wrap!")
+      .build();
 
     new RowSetComparison(expected).verifyAndClearAll(results);
   }
 
   @Test
-  public void testStructuredDataQuery() throws RpcException {
+  public void testStructuredDataQuery() throws Exception {
     String sql = "SELECT syslog_data.`structured_data`.`UserAgent` AS 
UserAgent, " +
             "syslog_data.`structured_data`.`UserHostAddress` AS 
UserHostAddress," +
             "syslog_data.`structured_data`.`BrowserSession` AS 
BrowserSession," +
@@ -177,71 +189,72 @@ public class TestSyslogFormat extends ClusterTest {
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
     TupleMetadata expectedSchema = new SchemaBuilder()
-            .add("UserAgent", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("UserHostAddress", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("BrowserSession", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("Realm", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("Appliance", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("Company", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("UserID", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("PEN", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("HostName", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("Category", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("Priority", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .buildSchema();
+      .add("UserAgent", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("UserHostAddress", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("BrowserSession", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("Realm", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("Appliance", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("Company", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("UserID", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("PEN", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("HostName", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("Category", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("Priority", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-            .addRow("Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) 
like Gecko", "192.168.2.132",
-                    "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0", 
"secureauthqa.gosecureauth.com", "SecureAuth Corporation",
-                    "Tester2", "27389", "192.168.2.132", "AUDIT", "4")
-            .build();
+      .addRow("Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) like 
Gecko", "192.168.2.132",
+              "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0", 
"secureauthqa.gosecureauth.com", "SecureAuth Corporation",
+              "Tester2", "27389", "192.168.2.132", "AUDIT", "4")
+      .build();
 
     new RowSetComparison(expected).verifyAndClearAll(results);
   }
 
   @Test
-  public void testStarFlattenedStructuredDataQuery() throws RpcException {
+  public void testStarFlattenedStructuredDataQuery() throws Exception {
     String sql = "SELECT * FROM cp.`syslog/test.syslog1`";
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
     TupleMetadata expectedSchema = new SchemaBuilder()
-            .add("event_date", TypeProtos.MinorType.TIMESTAMP, 
TypeProtos.DataMode.OPTIONAL)
-            .add("severity_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
-            .add("facility_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
-            .add("severity", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("facility", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("ip", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("app_name", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("process_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("message_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_text", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_UserAgent", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_UserHostAddress", 
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_BrowserSession", 
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_Realm", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_Appliance", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_Company", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_UserID", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_PEN", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_HostName", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_Category", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_Priority", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("message", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .buildSchema();
+      .add("event_date", TypeProtos.MinorType.TIMESTAMP, 
TypeProtos.DataMode.OPTIONAL)
+      .add("severity_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
+      .add("facility_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
+      .add("severity", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("facility", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("app_name", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("process_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("message_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_text", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("message", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_UserAgent", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_UserHostAddress", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_BrowserSession", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_Realm", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_Appliance", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_Company", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_UserID", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_PEN", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_HostName", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_Category", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_Priority", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-            .addRow(1438811939693L, 6, 10, "INFO", "AUTHPRIV", 
"192.168.2.132", "SecureAuth0", "23108", "ID52020",
-                    "{SecureAuth@27389=[UserAgent=Mozilla/5.0 (Windows NT 6.1; 
WOW64; Trident/7.0; rv:11.0) like Gecko, UserHostAddress=192.168.2.132, 
BrowserSession=0gvhdi5udjuqtweprbgoxilc, Realm=SecureAuth0, 
Appliance=secureauthqa.gosecureauth.com, Company=SecureAuth Corporation, 
UserID=Tester2, PEN=27389, HostName=192.168.2.132, Category=AUDIT, 
Priority=4]}",
-                    "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) 
like Gecko", "192.168.2.132",
-                    "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0", 
"secureauthqa.gosecureauth.com", "SecureAuth Corporation",
-                    "Tester2", "27389", "192.168.2.132", "AUDIT", "4", "Found 
the user for retrieving user's profile")
+      .addRow(1438811939693L, 6, 10, "INFO", "AUTHPRIV", "192.168.2.132", 
"SecureAuth0", "23108", "ID52020", "{SecureAuth@27389=[UserAgent=Mozilla/5.0 
(Windows NT 6.1; WOW64; " +
+        "Trident/7.0; rv:11.0) like Gecko, UserHostAddress=192.168.2.132, 
BrowserSession=0gvhdi5udjuqtweprbgoxilc, Realm=SecureAuth0, 
Appliance=secureauthqa.gosecureauth.com, " +
+        "Company=SecureAuth Corporation, UserID=Tester2, PEN=27389, 
HostName=192.168.2.132, Category=AUDIT, Priority=4]}", "Found the user for 
retrieving user's profile", "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; 
rv:11.0) like Gecko", "192.168.2.132", "0gvhdi5udjuqtweprbgoxilc", 
"SecureAuth0", "secureauthqa.gosecureauth.com", "SecureAuth Corporation", 
"Tester2", "27389", "192.168.2.132", "AUDIT", "4")
+      .addRow(1459529040580L, 6, 16, "INFO", "LOCAL0", "MacBook-Pro-3", null, 
"94473", null, null,
+        
"{\"pid\":94473,\"hostname\":\"MacBook-Pro-3\",\"level\":30,\"msg\":\"hello 
world\",\"time\":1459529098958,\"v\":1}", null, null, null, null, null, null, 
null, null, null, null, null)
             .build();
 
+    assertEquals(2, results.rowCount());
     new RowSetComparison(expected).verifyAndClearAll(results);
   }
 
   @Test
-  public void testExplicitFlattenedStructuredDataQuery() throws RpcException {
+  public void testExplicitFlattenedStructuredDataQuery() throws Exception {
     String sql = "SELECT event_date," +
             "severity_code," +
             "facility_code," +
@@ -268,39 +281,112 @@ public class TestSyslogFormat extends ClusterTest {
 
     RowSet results = client.queryBuilder().sql(sql).rowSet();
     TupleMetadata expectedSchema = new SchemaBuilder()
-            .add("event_date", TypeProtos.MinorType.TIMESTAMP, 
TypeProtos.DataMode.OPTIONAL)
-            .add("severity_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
-            .add("facility_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
-            .add("severity", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("facility", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("ip", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("app_name", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("process_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("message_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_text", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_UserAgent", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_UserHostAddress", 
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_BrowserSession", 
TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_Realm", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_Appliance", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_Company", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_UserID", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_PEN", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_HostName", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_Category", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("structured_data_Priority", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .add("message", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
-            .buildSchema();
-
+      .add("event_date", TypeProtos.MinorType.TIMESTAMP, 
TypeProtos.DataMode.OPTIONAL)
+      .add("severity_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
+      .add("facility_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
+      .add("severity", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("facility", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("app_name", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("process_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("message_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_text", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_UserAgent", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_UserHostAddress", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_BrowserSession", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_Realm", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_Appliance", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_Company", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_UserID", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_PEN", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_HostName", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_Category", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_Priority", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("message", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    assertEquals(2, results.rowCount());
 
     RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-            .addRow(1438811939693L, 6, 10, "INFO", "AUTHPRIV", 
"192.168.2.132", "SecureAuth0", "23108", "",
-                    "{SecureAuth@27389=[UserAgent=Mozilla/5.0 (Windows NT 6.1; 
WOW64; Trident/7.0; rv:11.0) like Gecko, UserHostAddress=192.168.2.132, 
BrowserSession=0gvhdi5udjuqtweprbgoxilc, Realm=SecureAuth0, 
Appliance=secureauthqa.gosecureauth.com, Company=SecureAuth Corporation, 
UserID=Tester2, PEN=27389, HostName=192.168.2.132, Category=AUDIT, 
Priority=4]}",
-                    "Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; rv:11.0) 
like Gecko", "192.168.2.132",
-                    "0gvhdi5udjuqtweprbgoxilc", "SecureAuth0", 
"secureauthqa.gosecureauth.com", "SecureAuth Corporation",
-                    "Tester2", "27389", "192.168.2.132", "AUDIT", "4", "Found 
the user for retrieving user's profile")
+      .addRow(1438811939693L, 6, 10, "INFO", "AUTHPRIV", "192.168.2.132", 
"SecureAuth0", "23108", "ID52020", "{SecureAuth@27389=[UserAgent=Mozilla/5.0 
(Windows NT 6.1; " +
+    "WOW64; Trident/7.0; rv:11.0) like Gecko, UserHostAddress=192.168.2.132, 
BrowserSession=0gvhdi5udjuqtweprbgoxilc, Realm=SecureAuth0, 
Appliance=secureauthqa.gosecureauth.com," +
+    " Company=SecureAuth Corporation, UserID=Tester2, PEN=27389, 
HostName=192.168.2.132, Category=AUDIT, Priority=4]}", "Mozilla/5.0 (Windows NT 
6.1; WOW64; Trident/7.0; rv:11.0) like Gecko", "192.168.2.132", 
"0gvhdi5udjuqtweprbgoxilc", "SecureAuth0", "secureauthqa.gosecureauth.com", 
"SecureAuth Corporation", "Tester2", "27389", "192.168.2.132", "AUDIT", "4", 
"Found the user for retrieving user's profile")
+      .addRow(1459529040580L, 6, 16, "INFO", "LOCAL0", "MacBook-Pro-3", null, 
"94473", null, null, null, null, null, null, null, null, null, null, null, 
null, null,
+    
"{\"pid\":94473,\"hostname\":\"MacBook-Pro-3\",\"level\":30,\"msg\":\"hello 
world\",\"time\":1459529098958,\"v\":1}")
             .build();
 
+    assertEquals(2, results.rowCount());
+    new RowSetComparison(expected).verifyAndClearAll(results);
+  }
+
+  @Test
+  public void testCount() throws Exception {
+    String sql = "SELECT COUNT(*) FROM cp.`syslog/logs1.syslog` ";
+    long result = client.queryBuilder().sql(sql).singletonLong();
+    assertEquals(6L, result);
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "SELECT COUNT(*) AS cnt FROM dfs.`syslog/logs1.syslog`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+    assertEquals("Counts should match",6L, cnt);
+  }
+
+  @Test
+  public void testLimitPushdown() throws Exception {
+    String sql = "SELECT * FROM cp.`syslog/logs1.syslog` LIMIT 5";
+
+    queryBuilder()
+      .sql(sql)
+      .planMatcher()
+      .include("Limit", "maxRecords=5")
+      .match();
+  }
+
+  @Test
+  public void testNonComplexFieldsWithCompressedFile() throws Exception {
+    generateCompressedFile("syslog/logs.syslog", "zip", 
"syslog/logs.syslog.zip" );
+
+    String sql = "SELECT event_date," +
+      "severity_code," +
+      "severity," +
+      "facility_code," +
+      "facility," +
+      "ip," +
+      "process_id," +
+      "message_id," +
+      "structured_data_text " +
+      "FROM dfs.`syslog/logs.syslog.zip`";
+
+    RowSet results = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata expectedSchema = new SchemaBuilder()
+      .add("event_date", TypeProtos.MinorType.TIMESTAMP, 
TypeProtos.DataMode.OPTIONAL)
+      .add("severity_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
+      .add("severity", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("facility_code", TypeProtos.MinorType.INT, 
TypeProtos.DataMode.OPTIONAL)
+      .add("facility", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("ip", TypeProtos.MinorType.VARCHAR, TypeProtos.DataMode.OPTIONAL)
+      .add("process_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("message_id", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .add("structured_data_text", TypeProtos.MinorType.VARCHAR, 
TypeProtos.DataMode.OPTIONAL)
+      .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
+      .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", 
null, "ID47", null)
+      .addRow(482196050520L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", 
null, "ID47", null)
+      .addRow(482196050520L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", 
null, "ID47", null)
+      .addRow(1065910455003L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", 
null, "ID47", null)
+      .addRow(1061727255000L, 2, "CRIT", 4, "AUTH", "mymachine.example.com", 
null, "ID47", null)
+      .addRow(1061727255000L, 5, "NOTICE", 20, "LOCAL4", "192.0.2.1", "8710", 
null, null)
+      .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4", 
"mymachine.example.com", null, "ID47", "{examplePriority@32473=[class=high], 
exampleSDID@32473=[iut=3, " +
+        "eventSource=Application, eventID=1011]}")
+      .addRow(1065910455003L, 5, "NOTICE", 20, "LOCAL4", 
"mymachine.example.com", null, "ID47", "{examplePriority@32473=[class=high], 
exampleSDID@32473=[iut=3, " +
+        "eventSource=Application, eventID=1011]}")
+      .build();
+
     new RowSetComparison(expected).verifyAndClearAll(results);
   }
 }
diff --git a/contrib/format-syslog/src/test/resources/syslog/test.syslog1 
b/contrib/format-syslog/src/test/resources/syslog/test.syslog1
index d8e19d9..752bd60 100644
--- a/contrib/format-syslog/src/test/resources/syslog/test.syslog1
+++ b/contrib/format-syslog/src/test/resources/syslog/test.syslog1
@@ -1,2 +1,2 @@
 <86>1 2015-08-05T21:58:59.693Z 192.168.2.132 SecureAuth0 23108 ID52020 
[SecureAuth@27389 UserAgent="Mozilla/5.0 (Windows NT 6.1; WOW64; Trident/7.0; 
rv:11.0) like Gecko" UserHostAddress="192.168.2.132" 
BrowserSession="0gvhdi5udjuqtweprbgoxilc" Realm="SecureAuth0" 
Appliance="secureauthqa.gosecureauth.com" Company="SecureAuth Corporation" 
UserID="Tester2" PEN="27389" HostName="192.168.2.132" Category="AUDIT" 
Priority="4"] Found the user for retrieving user's profile
-<134>1 2016-04-01T16:44:58Z MacBook-Pro-3 - 94473 - - 
{"pid":94473,"hostname":"MacBook-Pro-3","level":30,"msg":"hello 
world","time":1459529098958,"v":1}
\ No newline at end of file
+<134>1 2016-04-01T16:44:00.58Z MacBook-Pro-3 - 94473 - - 
{"pid":94473,"hostname":"MacBook-Pro-3","level":30,"msg":"hello 
world","time":1459529098958,"v":1}
\ No newline at end of file

Reply via email to