This is an automated email from the ASF dual-hosted git repository.

cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git


The following commit(s) were added to refs/heads/master by this push:
     new 29716db  DRILL-7533: Convert Pcapng to EVF
29716db is described below

commit 29716db80cee6dc82a7045a020b0c07053a61bd8
Author: luoc <[email protected]>
AuthorDate: Wed Dec 23 17:11:34 2020 +0800

    DRILL-7533: Convert Pcapng to EVF
---
 .../apache/drill/exec/store/pcapng/PcapColumn.java | 1020 ++++++++++++++++++++
 .../drill/exec/store/pcapng/PcapngBatchReader.java |  275 ++++++
 .../exec/store/pcapng/PcapngFormatConfig.java      |   38 +-
 .../exec/store/pcapng/PcapngFormatPlugin.java      |  103 +-
 .../exec/store/pcapng/PcapngRecordReader.java      |  214 ----
 .../drill/exec/store/pcapng/schema/Column.java     |   28 -
 .../exec/store/pcapng/schema/DummyArrayImpl.java   |   34 -
 .../drill/exec/store/pcapng/schema/DummyImpl.java  |   34 -
 .../drill/exec/store/pcapng/schema/Schema.java     |  441 ---------
 .../drill/exec/store/pcapng/schema/Util.java       |   59 --
 .../store/dfs/TestFormatPluginOptionExtractor.java |    6 +
 .../drill/exec/store/pcapng/TestPcapngHeaders.java |  213 ----
 .../exec/store/pcapng/TestPcapngRecordReader.java  |  205 +++-
 .../store/pcapng/TestPcapngStatRecordReader.java   |  139 +++
 14 files changed, 1678 insertions(+), 1131 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapColumn.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapColumn.java
new file mode 100644
index 0000000..ec9d5f5
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapColumn.java
@@ -0,0 +1,1020 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.store.pcap.PcapFormatUtils;
+import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.joda.time.Instant;
+
+import fr.bmartel.pcapdecoder.structure.options.inter.IOptionsStatisticsHeader;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IDescriptionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import fr.bmartel.pcapdecoder.structure.types.inter.INameResolutionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.ISectionHeaderBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IStatisticsBlock;
+
+public abstract class PcapColumn {
+
+  private static final Map<String, PcapColumn> columns = new LinkedHashMap<>();
+  private static final Map<String, PcapColumn> summary_columns = new 
LinkedHashMap<>();
+  public static final String DUMMY_NAME = "dummy";
+  public static final String PATH_NAME = "path";
+
+  static {
+    // Basic
+    columns.put("timestamp", new PcapTimestamp());
+    columns.put("packet_length", new PcapPacketLength());
+    columns.put("type", new PcapType());
+    columns.put("src_ip", new PcapSrcIp());
+    columns.put("dst_ip", new PcapDstIp());
+    columns.put("src_port", new PcapSrcPort());
+    columns.put("dst_port", new PcapDstPort());
+    columns.put("src_mac_address", new PcapSrcMac());
+    columns.put("dst_mac_address", new PcapDstMac());
+    columns.put("tcp_session", new PcapTcpSession());
+    columns.put("tcp_ack", new PcapTcpAck());
+    columns.put("tcp_flags", new PcapTcpFlags());
+    columns.put("tcp_flags_ns", new PcapTcpFlagsNs());
+    columns.put("tcp_flags_cwr", new PcapTcpFlagsCwr());
+    columns.put("tcp_flags_ece", new PcapTcpFlagsEce());
+    columns.put("tcp_flags_ece_ecn_capable", new PcapTcpFlagsEceEcnCapable());
+    columns.put("tcp_flags_ece_congestion_experienced", new 
PcapTcpFlagsEceCongestionExperienced());
+    columns.put("tcp_flags_urg", new PcapTcpFlagsUrg());
+    columns.put("tcp_flags_ack", new PcapTcpFlagsAck());
+    columns.put("tcp_flags_psh", new PcapTcpFlagsPsh());
+    columns.put("tcp_flags_rst", new PcapTcpFlagsRst());
+    columns.put("tcp_flags_syn", new PcapTcpFlagsSyn());
+    columns.put("tcp_flags_fin", new PcapTcpFlagsFin());
+    columns.put("tcp_parsed_flags", new PcapTcpParsedFlags());
+    columns.put("packet_data", new PcapPacketData());
+
+    // Extensions
+    summary_columns.put("path", new PcapStatPath());
+    // Section Header Block
+    summary_columns.put("shb_hardware", new PcapHardware());
+    summary_columns.put("shb_os", new PcapOS());
+    summary_columns.put("shb_userappl", new PcapUserAppl());
+    // Interface Description Block
+    summary_columns.put("if_name", new PcapIfName());
+    summary_columns.put("if_description", new PcapIfDescription());
+    summary_columns.put("if_ipv4addr", new PcapIfIPv4addr());
+    summary_columns.put("if_ipv6addr", new PcapIfIPv6addr());
+    summary_columns.put("if_macaddr", new PcapIfMACaddr());
+    summary_columns.put("if_euiaddr", new PcapIfEUIaddr());
+    summary_columns.put("if_speed", new PcapIfSpeed());
+    summary_columns.put("if_tsresol", new PcapIfTsresol());
+    summary_columns.put("if_tzone", new PcapIfTzone());
+    summary_columns.put("if_os", new PcapIfOS());
+    summary_columns.put("if_fcslen", new PcapIfFcslen());
+    summary_columns.put("if_tsoffset", new PcapIfTsOffset());
+    // Name Resolution Block
+    summary_columns.put("ns_dnsname", new PcapDnsName());
+    summary_columns.put("ns_dnsip4addr", new PcapDnsIP4addr());
+    summary_columns.put("ns_dnsip6addr", new PcapDnsIP6addr());
+    // Interface Statistics Block
+    summary_columns.put("isb_starttime", new PcapIsbStarttime());
+    summary_columns.put("isb_endtime", new PcapIsbEndtime());
+    summary_columns.put("isb_ifrecv", new PcapIsbIfrecv());
+    summary_columns.put("isb_ifdrop", new PcapIsbIfdrop());
+    summary_columns.put("isb_filteraccept", new PcapIsbFilterAccept());
+    summary_columns.put("isb_osdrop", new PcapIsbOSdrop());
+    summary_columns.put("isb_usrdeliv", new PcapIsbUsrdeliv());
+  }
+
+  abstract MajorType getType();
+
+  abstract void process(IPcapngType block, ScalarWriter writer);
+
+  public static Map<String, PcapColumn> getColumns() {
+    return Collections.unmodifiableMap(columns);
+  }
+
+  public static Map<String, PcapColumn> getSummaryColumns() {
+    return Collections.unmodifiableMap(summary_columns);
+  }
+
+  static class PcapDummy extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) { }
+  }
+
+  static class PcapStatPath extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) { }
+  }
+
+  static class PcapTimestamp extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.required(MinorType.TIMESTAMP);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      writer.setTimestamp(Instant.ofEpochMilli(((IEnhancedPacketBLock) 
block).getTimeStamp() / 1000));
+    }
+  }
+
+  static class PcapPacketLength extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.required(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      writer.setInt(((IEnhancedPacketBLock) block).getPacketLength());
+    }
+  }
+
+  static class PcapType extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setString(packet.getPacketType());
+      }
+    }
+  }
+
+  static class PcapSrcIp extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setString(packet.getSrc_ip().getHostAddress());
+      }
+    }
+  }
+
+  static class PcapDstIp extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setString(packet.getDst_ip().getHostAddress());
+      }
+    }
+  }
+
+  static class PcapSrcPort extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setInt(packet.getSrc_port());
+      }
+    }
+  }
+
+  static class PcapDstPort extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setInt(packet.getDst_port());
+      }
+    }
+  }
+
+  static class PcapSrcMac extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setString(packet.getEthernetSource());
+      }
+    }
+  }
+
+  static class PcapDstMac extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setString(packet.getEthernetDestination());
+      }
+    }
+  }
+
+  static class PcapTcpSession extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.BIGINT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setLong(packet.getSessionHash());
+      }
+    }
+  }
+
+  static class PcapTcpAck extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setInt(packet.getAckNumber());
+      }
+    }
+  }
+
+  static class PcapTcpFlags extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setInt(packet.getFlags());
+      }
+    }
+  }
+
+  static class PcapTcpFlagsNs extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x100) != 0);
+      }
+    }
+  }
+
+  static class PcapTcpFlagsCwr extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x80) != 0);
+      }
+    }
+  }
+
+  static class PcapTcpFlagsEce extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x40) != 0);
+      }
+    }
+  }
+
+  static class PcapTcpFlagsEceEcnCapable extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x42) == 0x42);
+      }
+    }
+  }
+
+  static class PcapTcpFlagsEceCongestionExperienced extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x42) == 0x40);
+      }
+    }
+  }
+
+  static class PcapTcpFlagsUrg extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x20) != 0);
+      }
+    }
+  }
+
+  static class PcapTcpFlagsAck extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x10) != 0);
+      }
+    }
+  }
+
+  static class PcapTcpFlagsPsh extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x8) != 0);
+      }
+    }
+  }
+
+  static class PcapTcpFlagsRst extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x4) != 0);
+      }
+    }
+  }
+
+  static class PcapTcpFlagsSyn extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x2) != 0);
+      }
+    }
+  }
+
+  static class PcapTcpFlagsFin extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setBoolean((packet.getFlags() & 0x1) != 0);
+      }
+    }
+  }
+
+  static class PcapTcpParsedFlags extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        writer.setString(packet.getParsedFlags());
+      }
+    }
+  }
+
+  static class PcapPacketData extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      PacketDecoder packet = new PacketDecoder();
+      if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+        
writer.setString(PcapFormatUtils.parseBytesToASCII(((IEnhancedPacketBLock) 
block).getPacketData()));
+      }
+    }
+  }
+
+  /**
+   * shb_hardware: description of the hardware
+   */
+  static class PcapHardware extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof ISectionHeaderBlock)) {
+        return;
+      }
+      writer.setString(((ISectionHeaderBlock) 
block).getOptions().getHardware());
+    }
+  }
+
+  // Section Header Block
+
+  /**
+   * shb_os: name of the OS
+   */
+  static class PcapOS extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof ISectionHeaderBlock)) {
+        return;
+      }
+      writer.setString(((ISectionHeaderBlock) block).getOptions().getOS());
+    }
+  }
+
+  /**
+   * shb_userappl: name of the user application
+   */
+  static class PcapUserAppl extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof ISectionHeaderBlock)) {
+        return;
+      }
+      writer.setString(((ISectionHeaderBlock) 
block).getOptions().getUserAppl());
+    }
+  }
+
+  // Interface Description Block
+
+  /**
+   * if_name: name of the device used to capture
+   */
+  static class PcapIfName extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setString(((IDescriptionBlock) 
block).getOptions().getInterfaceName());
+    }
+  }
+
+  /**
+   * if_description: Description of the device used to capture the data
+   */
+  static class PcapIfDescription extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setString(((IDescriptionBlock) 
block).getOptions().getInterfaceDescription());
+    }
+  }
+
+  /**
+   * if_IPv4addr: IPV4 address
+   */
+  static class PcapIfIPv4addr extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setString(((IDescriptionBlock) 
block).getOptions().getInterfaceIpv4NetworkAddr());
+    }
+  }
+
+  /**
+   * if_IPv6addr: IPV6 address
+   */
+  static class PcapIfIPv6addr extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setString(((IDescriptionBlock) 
block).getOptions().getIpv6NetworkAddr());
+    }
+  }
+
+  /**
+   * if_MACaddr: MAC address
+   */
+  static class PcapIfMACaddr extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setString(((IDescriptionBlock) 
block).getOptions().getInterfaceMacAddr());
+    }
+  }
+
+  /**
+   * if_EUIaddr: EUI address
+   */
+  static class PcapIfEUIaddr extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setString(((IDescriptionBlock) 
block).getOptions().getInterfaceEuiAddr());
+    }
+  }
+
+  /**
+   * if_speed: interface speed in bps
+   */
+  static class PcapIfSpeed extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setInt(((IDescriptionBlock) 
block).getOptions().getInterfaceSpeed());
+    }
+  }
+
+  /**
+   * if_tsresol: Resolution of timestamp (6 means microsecond resolution for 
instance)
+   */
+  static class PcapIfTsresol extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setInt(((IDescriptionBlock) 
block).getOptions().getTimeStampResolution());
+    }
+  }
+
+  /**
+   * if_tzone: indicate Time zone => offset from UTC time
+   */
+  static class PcapIfTzone extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setInt(((IDescriptionBlock) block).getOptions().getTimeBias());
+    }
+  }
+
+  /**
+   * if_os: Name of the operating system
+   */
+  static class PcapIfOS extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setString(((IDescriptionBlock) 
block).getOptions().getInterfaceOperatingSystem());
+    }
+  }
+
+  /**
+   * if_fcslen: Length of the Frame Check Sequence (in bits)
+   */
+  static class PcapIfFcslen extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setInt(((IDescriptionBlock) 
block).getOptions().getInterfaceFrameCheckSequenceLength());
+    }
+  }
+
+  /**
+   * if_tsoffset: Timestamp offset for each packet / if not present timestamp 
are absolute
+   */
+  static class PcapIfTsOffset extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.INT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IDescriptionBlock)) {
+        return;
+      }
+      writer.setInt(((IDescriptionBlock) 
block).getOptions().getTimeStampOffset());
+    }
+  }
+
+  // Name Resolution Block
+
+  /**
+   * ns_dnsname: Retrieve DNS server name
+   */
+  static class PcapDnsName extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof INameResolutionBlock)) {
+        return;
+      }
+      writer.setString(((INameResolutionBlock) 
block).getOptions().getDnsName());
+    }
+  }
+
+  /**
+   * ns_dnsIP4addr: Retrieve DNS IPV4 server address
+   */
+  static class PcapDnsIP4addr extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof INameResolutionBlock)) {
+        return;
+      }
+      writer.setString(((INameResolutionBlock) 
block).getOptions().getDnsIpv4Addr());
+    }
+  }
+
+  /**
+   * ns_dnsIP6addr: Retrieve DNS IPV6 server address
+   */
+  static class PcapDnsIP6addr extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.VARCHAR);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof INameResolutionBlock)) {
+        return;
+      }
+      writer.setString(((INameResolutionBlock) 
block).getOptions().getDnsIpv6Addr());
+    }
+  }
+
+  // Interface Statistics Block
+
+  /**
+   * isb_starttime: capture start time (timestamp resolution is defined in 
Interface description header check exemple)
+   */
+  static class PcapIsbStarttime extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.TIMESTAMP);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IStatisticsBlock)) {
+        return;
+      }
+      IOptionsStatisticsHeader statisticsHeader = ((IStatisticsBlock) 
block).getOptions();
+      
writer.setTimestamp(Instant.ofEpochMilli(statisticsHeader.getCaptureStartTime() 
/ 1000));
+    }
+  }
+
+  /**
+   * isb_endtime: capture end time (timestamp resolution is defined in 
Interface description header check example)
+   */
+  static class PcapIsbEndtime extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.TIMESTAMP);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IStatisticsBlock)) {
+        return;
+      }
+      IOptionsStatisticsHeader statisticsHeader = ((IStatisticsBlock) 
block).getOptions();
+      
writer.setTimestamp(Instant.ofEpochMilli(statisticsHeader.getCaptureEndTime() / 
1000));
+    }
+  }
+
+  /**
+   * isb_ifrecv: packet received count
+   */
+  static class PcapIsbIfrecv extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.BIGINT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IStatisticsBlock)) {
+        return;
+      }
+      writer.setLong(((IStatisticsBlock) 
block).getOptions().getPacketReceivedCount());
+    }
+  }
+
+  /**
+   * isb_ifdrop: packet drop count
+   */
+  static class PcapIsbIfdrop extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.BIGINT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IStatisticsBlock)) {
+        return;
+      }
+      writer.setLong(((IStatisticsBlock) 
block).getOptions().getPacketDropCount());
+    }
+  }
+
+  /**
+   * isb_filteraccept: packet accepted by filter count
+   */
+  static class PcapIsbFilterAccept extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.BIGINT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IStatisticsBlock)) {
+        return;
+      }
+      writer.setLong(((IStatisticsBlock) 
block).getOptions().getPacketAcceptedByFilterCount());
+    }
+  }
+
+  /**
+   * isb_osdrop: packet dropped by Operating system count
+   */
+  static class PcapIsbOSdrop extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.BIGINT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IStatisticsBlock)) {
+        return;
+      }
+      writer.setLong(((IStatisticsBlock) 
block).getOptions().getPacketDroppedByOS());
+    }
+  }
+
+  /**
+   * isb_usrdeliv: packet deliver to use count
+   */
+  static class PcapIsbUsrdeliv extends PcapColumn {
+
+    @Override
+    MajorType getType() {
+      return Types.optional(MinorType.BIGINT);
+    }
+
+    @Override
+    void process(IPcapngType block, ScalarWriter writer) {
+      if (!(block instanceof IStatisticsBlock)) {
+        return;
+      }
+      writer.setLong(((IStatisticsBlock) 
block).getOptions().getPacketDeliveredToUser());
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
new file mode 100644
index 0000000..eeabebf
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import fr.bmartel.pcapdecoder.PcapDecoder;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+
+public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(PcapngBatchReader.class);
+
+  private final PcapngFormatConfig config;
+  private final EasySubScan scan;
+  private final int maxRecords;
+  private CustomErrorContext errorContext;
+  private List<SchemaPath> columns;
+  private List<ColumnDefn> projectedColumns;
+  private Iterator<IPcapngType> pcapIterator;
+  private IPcapngType block;
+  private RowSetLoader loader;
+  private InputStream in;
+  private Path path;
+
+  public PcapngBatchReader(final PcapngFormatConfig config, final EasySubScan 
scan) {
+    this.config = config;
+    this.scan = scan;
+    this.maxRecords = scan.getMaxRecords();
+    this.columns = scan.getColumns();
+  }
+
+  @Override
+  public boolean open(FileSchemaNegotiator negotiator) {
+    try {
+      // init InputStream for pcap file
+      errorContext = negotiator.parentErrorContext();
+      DrillFileSystem dfs = negotiator.fileSystem();
+      path = dfs.makeQualified(negotiator.split().getPath());
+      in = dfs.openPossiblyCompressedStream(path);
+      // decode the pcap file
+      PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
+      decoder.decode();
+      pcapIterator = decoder.getSectionList().iterator();
+      logger.debug("The config is {}, root is {}, columns has {}", config, 
scan.getSelectionRoot(), columns);
+    } catch (IOException e) {
+      throw UserException
+             .dataReadError(e)
+             .message("Failure in initial pcapng inputstream. " + 
e.getMessage())
+             .addContext(errorContext)
+             .build(logger);
+    } catch (Exception e) {
+      throw UserException
+             .dataReadError(e)
+             .message("Failed to decode the pcapng file. " + e.getMessage())
+             .addContext(errorContext)
+             .build(logger);
+    }
+    // define the schema
+    negotiator.tableSchema(defineMetadata(), true);
+    ResultSetLoader resultSetLoader = negotiator.build();
+    loader = resultSetLoader.writer();
+    // bind the writer for columns
+    bindColumns(loader);
+    return true;
+  }
+
+  /**
+   * The default of the `stat` parameter is false,
+   * which means that the packet data is parsed and returned,
+   * but if true, will return the statistics data about the each pcapng file 
only
+   * (consist of the information about collect devices and the summary of the 
packet data above).
+   *
+   * In addition, a pcapng file contains a single Section Header Block (SHB),
+   * a single Interface Description Block (IDB) and a few Enhanced Packet 
Blocks (EPB).
+   * <pre>
+   * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   * | SHB | IDB | EPB | EPB |    ...    | EPB |
+   * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+   * </pre>
+   * 
https://pcapng.github.io/pcapng/draft-tuexen-opsawg-pcapng.html#name-physical-file-layout
+   */
+  @Override
+  public boolean next() {
+    while (!loader.isFull()) {
+      if (!pcapIterator.hasNext()) {
+        return false;
+      } else if (config.getStat() && isIEnhancedPacketBlock()) {
+        continue;
+      } else if (!config.getStat() && !isIEnhancedPacketBlock()) {
+        continue;
+      }
+      processBlock();
+      if (loader.limitReached(maxRecords)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  @Override
+  public void close() {
+    AutoCloseables.closeSilently(in);
+  }
+
+  private boolean isIEnhancedPacketBlock() {
+    block = pcapIterator.next();
+    return block instanceof IEnhancedPacketBLock;
+  }
+
+  private void processBlock() {
+    loader.start();
+    for (ColumnDefn columnDefn : projectedColumns) {
+      // pcapng file name
+      if (columnDefn.getName().equals(PcapColumn.PATH_NAME)) {
+        columnDefn.load(path.getName());
+      } else {
+        // pcapng block data
+        columnDefn.load(block);
+      }
+    }
+    loader.save();
+  }
+
+  private boolean isSkipQuery() {
+    return columns.isEmpty();
+  }
+
+  private boolean isStarQuery() {
+    return Utilities.isStarQuery(columns);
+  }
+
+  private TupleMetadata defineMetadata() {
+    SchemaBuilder builder = new SchemaBuilder();
+    processProjected(columns);
+    for (ColumnDefn columnDefn : projectedColumns) {
+      columnDefn.define(builder);
+    }
+    return builder.buildSchema();
+  }
+
+  /**
+   * <b> Define the schema based on projected </b><br/>
+   * 1. SkipQuery: no field specified, such as count(*) <br/>
+   * 2. StarQuery: select * <br/>
+   * 3. ProjectPushdownQuery: select a,b,c <br/>
+   */
+  private void processProjected(List<SchemaPath> columns) {
+    projectedColumns = new ArrayList<ColumnDefn>();
+    if (isSkipQuery()) {
+      projectedColumns.add(new ColumnDefn(PcapColumn.DUMMY_NAME, new 
PcapColumn.PcapDummy()));
+    } else if (isStarQuery()) {
+      Set<Map.Entry<String, PcapColumn>> pcapColumns;
+      if (config.getStat()) {
+        pcapColumns = PcapColumn.getSummaryColumns().entrySet();
+      } else {
+        pcapColumns = PcapColumn.getColumns().entrySet();
+      }
+      for (Map.Entry<String, PcapColumn> pcapColumn : pcapColumns) {
+        makePcapColumns(projectedColumns, pcapColumn.getKey(), 
pcapColumn.getValue());
+      }
+    } else {
+      for (SchemaPath schemaPath : columns) {
+        // Support Case-Insensitive
+        String projectedName = schemaPath.rootName().toLowerCase();
+        PcapColumn pcapColumn;
+        if (config.getStat()) {
+          pcapColumn = PcapColumn.getSummaryColumns().get(projectedName);
+        } else {
+          pcapColumn = PcapColumn.getColumns().get(projectedName);
+        }
+        if (pcapColumn != null) {
+          makePcapColumns(projectedColumns, projectedName, pcapColumn);
+        } else {
+          makePcapColumns(projectedColumns, projectedName, new 
PcapColumn.PcapDummy());
+          logger.debug("{} missing the PcapColumn implement class.", 
projectedName);
+        }
+      }
+    }
+    Collections.unmodifiableList(projectedColumns);
+  }
+
+  private void makePcapColumns(List<ColumnDefn> projectedColumns, String name, 
PcapColumn column) {
+    projectedColumns.add(new ColumnDefn(name, column));
+  }
+
+  private void bindColumns(RowSetLoader loader) {
+    for (ColumnDefn columnDefn : projectedColumns) {
+      columnDefn.bind(loader);
+    }
+  }
+
+  private static class ColumnDefn {
+
+    private final String name;
+    private PcapColumn processor;
+    private ScalarWriter writer;
+
+    public ColumnDefn(String name, PcapColumn column) {
+      this.name = name;
+      this.processor = column;
+    }
+
+    public String getName() {
+      return name;
+    }
+
+    public PcapColumn getProcessor() {
+      return processor;
+    }
+
+    public void bind(RowSetLoader loader) {
+      writer = loader.scalar(getName());
+    }
+
+    public void define(SchemaBuilder builder) {
+      if (getProcessor().getType().getMode() == DataMode.REQUIRED) {
+        builder.add(getName(), getProcessor().getType().getMinorType());
+      } else {
+        builder.addNullable(getName(), 
getProcessor().getType().getMinorType());
+      }
+    }
+
+    public void load(IPcapngType block) {
+      getProcessor().process(block, writer);
+    }
+
+    public void load(String value) {
+      writer.setString(value);
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
index 8ded7ad..7210f93 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
@@ -17,24 +17,42 @@
  */
 package org.apache.drill.exec.store.pcapng;
 
-import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.util.List;
+import java.util.Objects;
 
 import org.apache.drill.common.PlanStringBuilder;
 import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
 
-@JsonTypeName("pcapng")
+@JsonTypeName(PcapngFormatConfig.NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
 public class PcapngFormatConfig implements FormatPluginConfig {
 
-  public List<String> extensions = Collections.singletonList("pcapng");
+  public static final String NAME = "pcapng";
+  private final List<String> extensions;
+  private final boolean stat;
+
+  @JsonCreator
+  public PcapngFormatConfig(@JsonProperty("extensions") List<String> 
extensions, @JsonProperty("stat") boolean stat) {
+    this.extensions = extensions == null ? 
ImmutableList.of(PcapngFormatConfig.NAME) : ImmutableList.copyOf(extensions);
+    this.stat = stat;
+  }
 
+  @JsonProperty("extensions")
   public List<String> getExtensions() {
     return extensions;
   }
 
+  @JsonProperty("stat")
+  public boolean getStat() {
+    return this.stat;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
@@ -44,18 +62,16 @@ public class PcapngFormatConfig implements 
FormatPluginConfig {
       return false;
     }
     PcapngFormatConfig that = (PcapngFormatConfig) o;
-    return Objects.equals(extensions, that.extensions);
+    return Objects.equals(extensions, that.extensions) && Objects.equals(stat, 
that.getStat());
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(extensions);
+    return Objects.hash(extensions, stat);
   }
 
   @Override
   public String toString() {
-    return new PlanStringBuilder(this)
-        .field("extensions", extensions)
-        .toString();
+    return new PlanStringBuilder(this).field("extensions", 
extensions).field("stat", stat).toString();
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
index 41be760..d80e8ac 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
@@ -17,79 +17,76 @@
  */
 package org.apache.drill.exec.store.pcapng;
 
-import java.io.IOException;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.planner.common.DrillStatsTable;
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import 
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
 import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.server.options.OptionManager;
 import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
 import org.apache.hadoop.conf.Configuration;
 
-import java.util.List;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
 public class PcapngFormatPlugin extends EasyFormatPlugin<PcapngFormatConfig> {
 
-  public static final String DEFAULT_NAME = "pcapng";
-
-  public PcapngFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf,
-                            StoragePluginConfig storagePluginConfig) {
-    this(name, context, fsConf, storagePluginConfig, new PcapngFormatConfig());
+  public PcapngFormatPlugin(String name,
+                            DrillbitContext context,
+                            Configuration fsConf,
+                            StoragePluginConfig storageConfig,
+                            PcapngFormatConfig formatConfig) {
+    super(name, easyConfig(fsConf, formatConfig), context, storageConfig, 
formatConfig);
   }
 
-  public PcapngFormatPlugin(String name, DrillbitContext context, 
Configuration fsConf, StoragePluginConfig config, PcapngFormatConfig 
formatPluginConfig) {
-    super(name, context, fsConf, config, formatPluginConfig, true,
-        false, false, true,
-        formatPluginConfig.getExtensions(), DEFAULT_NAME);
+  private static EasyFormatConfig easyConfig(Configuration fsConf, 
PcapngFormatConfig pluginConfig) {
+    EasyFormatConfig config = new EasyFormatConfig();
+    config.readable = true;
+    config.writable = false;
+    config.blockSplittable = false;
+    config.compressible = true;
+    config.extensions = pluginConfig.getExtensions();
+    config.fsConf = fsConf;
+    config.readerOperatorType = CoreOperatorType.PCAPNG_SUB_SCAN_VALUE;
+    config.useEnhancedScan = true;
+    config.supportsLimitPushdown = true;
+    config.supportsProjectPushdown = true;
+    config.defaultName = PcapngFormatConfig.NAME;
+    return config;
   }
 
-  @Override
-  public boolean supportsPushDown() {
-    return true;
-  }
+  private static class PcapngReaderFactory extends FileReaderFactory {
 
-  @Override
-  public RecordReader getRecordReader(FragmentContext context, DrillFileSystem 
dfs,
-                                      FileWork fileWork, List<SchemaPath> 
columns,
-                                      String userName) {
-    return new PcapngRecordReader(fileWork.getPath(), dfs, columns);
-  }
+    private final PcapngFormatConfig config;
+    private final EasySubScan scan;
 
-  @Override
-  public RecordWriter getRecordWriter(FragmentContext context, EasyWriter 
writer) {
-    throw new UnsupportedOperationException("unimplemented");
-  }
+    public PcapngReaderFactory(PcapngFormatConfig config, EasySubScan scan) {
+      this.config = config;
+      this.scan = scan;
+    }
 
-  @Override
-  public int getReaderOperatorType() {
-    return UserBitShared.CoreOperatorType.PCAPNG_SUB_SCAN_VALUE;
-  }
-
-  @Override
-  public int getWriterOperatorType() {
-    throw new UnsupportedOperationException("unimplemented");
-  }
-
-  @Override
-  public boolean supportsStatistics() {
-    return false;
+    @Override
+    public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+      return new PcapngBatchReader(config, scan);
+    }
   }
 
   @Override
-  public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path 
statsTablePath) throws IOException {
-    return null;
+  public ManagedReader<? extends FileSchemaNegotiator> 
newBatchReader(EasySubScan scan, OptionManager options)
+      throws ExecutionSetupException {
+    return new PcapngBatchReader(formatConfig, scan);
   }
 
   @Override
-  public void writeStatistics(DrillStatsTable.TableStatistics statistics, 
FileSystem fs, Path statsTablePath) throws IOException {
+  protected FileScanBuilder frameworkBuilder(OptionManager options, 
EasySubScan scan) throws ExecutionSetupException {
+    FileScanBuilder builder = new FileScanBuilder();
+    builder.setReaderFactory(new PcapngReaderFactory(formatConfig, scan));
 
+    initScanBuilder(builder, scan);
+    builder.nullType(Types.optional(MinorType.VARCHAR));
+    return builder;
   }
 }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
deleted file mode 100644
index 152e2e6..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng;
-
-import fr.bmartel.pcapdecoder.PcapDecoder;
-import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
-import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-import org.apache.commons.io.IOUtils;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.pcapng.schema.Column;
-import org.apache.drill.exec.store.pcapng.schema.DummyArrayImpl;
-import org.apache.drill.exec.store.pcapng.schema.DummyImpl;
-import org.apache.drill.exec.store.pcapng.schema.Schema;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-public class PcapngRecordReader extends AbstractRecordReader {
-  private static final Logger logger = 
LoggerFactory.getLogger(PcapngRecordReader.class);
-
-  // batch size should not exceed max allowed record count
-  private static final int BATCH_SIZE = 40_000;
-
-  private final Path pathToFile;
-  private OutputMutator output;
-  private List<ProjectedColumnInfo> projectedCols;
-  private DrillFileSystem fs;
-  private InputStream in;
-  private List<SchemaPath> columns;
-
-  private Iterator<IPcapngType> it;
-
-  public PcapngRecordReader(final Path pathToFile,
-                            final DrillFileSystem fileSystem,
-                            final List<SchemaPath> columns) {
-    this.fs = fileSystem;
-    this.pathToFile = fs.makeQualified(pathToFile);
-    this.columns = columns;
-    setColumns(columns);
-  }
-
-  @Override
-  public void setup(final OperatorContext context, final OutputMutator output) 
throws ExecutionSetupException {
-    try {
-
-      this.output = output;
-      this.in = fs.openPossiblyCompressedStream(pathToFile);
-      PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
-      decoder.decode();
-      this.it = decoder.getSectionList().iterator();
-      setupProjection();
-    } catch (IOException io) {
-      throw UserException.dataReadError(io)
-          .addContext("File name:", pathToFile.toUri().getPath())
-          .build(logger);
-    }
-  }
-
-  @Override
-  public int next() {
-    if (isSkipQuery()) {
-      return iterateOverBlocks((block, counter) -> {
-      });
-    } else {
-      return iterateOverBlocks((block, counter) -> 
putToTable((IEnhancedPacketBLock) block, counter));
-    }
-  }
-
-  private void putToTable(IEnhancedPacketBLock bLock, Integer counter) {
-    for (ProjectedColumnInfo pci : projectedCols) {
-      pci.getColumn().process(bLock, pci.getVv(), counter);
-    }
-  }
-
-  @Override
-  public void close() throws Exception {
-    if (in != null) {
-      in.close();
-      in = null;
-    }
-  }
-
-  private void setupProjection() {
-    if (isSkipQuery()) {
-      projectedCols = projectNone();
-    } else if (isStarQuery()) {
-      projectedCols = projectAllCols(Schema.getColumnsNames());
-    } else {
-      projectedCols = projectCols(columns);
-    }
-  }
-
-  private List<ProjectedColumnInfo> projectNone() {
-    List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
-    pciBuilder.add(makeColumn("dummy", new DummyImpl()));
-    return Collections.unmodifiableList(pciBuilder);
-  }
-
-  private List<ProjectedColumnInfo> projectAllCols(final Set<String> columns) {
-    List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
-    for (String colName : columns) {
-      pciBuilder.add(makeColumn(colName, Schema.getColumns().get(colName)));
-    }
-    return Collections.unmodifiableList(pciBuilder);
-  }
-
-  private List<ProjectedColumnInfo> projectCols(final List<SchemaPath> 
columns) {
-    List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
-    for (SchemaPath schemaPath : columns) {
-      String projectedName = schemaPath.rootName();
-      if (schemaPath.isArray()) {
-        pciBuilder.add(makeColumn(projectedName, new DummyArrayImpl()));
-      } else if (Schema.getColumns().containsKey(projectedName.toLowerCase())) 
{
-        pciBuilder.add(makeColumn(projectedName,
-            Schema.getColumns().get(projectedName.toLowerCase())));
-      } else {
-        pciBuilder.add(makeColumn(projectedName, new DummyImpl()));
-      }
-    }
-    return Collections.unmodifiableList(pciBuilder);
-  }
-
-  private ProjectedColumnInfo makeColumn(final String colName, final Column 
column) {
-    MaterializedField field = MaterializedField.create(colName, 
column.getMinorType());
-    ValueVector vector = getValueVector(field, output);
-    return new ProjectedColumnInfo(vector, column, colName);
-  }
-
-  private ValueVector getValueVector(final MaterializedField field, final 
OutputMutator output) {
-    try {
-      TypeProtos.MajorType majorType = field.getType();
-      final Class<? extends ValueVector> clazz = 
TypeHelper.getValueVectorClass(
-          majorType.getMinorType(), majorType.getMode());
-
-      return output.addField(field, clazz);
-    } catch (SchemaChangeException sce) {
-      throw UserException.internalError(sce)
-          .addContext("The addition of this field is incompatible with this 
OutputMutator's capabilities")
-          .build(logger);
-    }
-  }
-
-  private Integer iterateOverBlocks(BiConsumer<IPcapngType, Integer> consumer) 
{
-    int counter = 0;
-    while (it.hasNext() && counter < BATCH_SIZE) {
-      IPcapngType block = it.next();
-      if (block instanceof IEnhancedPacketBLock) {
-        consumer.accept(block, counter);
-        counter++;
-      }
-    }
-    return counter;
-  }
-
-  private static class ProjectedColumnInfo {
-
-    private ValueVector vv;
-    private Column colDef;
-    private String columnName;
-
-    ProjectedColumnInfo(ValueVector vv, Column colDef, String columnName) {
-      this.vv = vv;
-      this.colDef = colDef;
-      this.columnName = columnName;
-    }
-
-    public ValueVector getVv() {
-      return vv;
-    }
-
-    Column getColumn() {
-      return colDef;
-    }
-
-    public String getColumnName() {
-      return columnName;
-    }
-  }
-}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
deleted file mode 100644
index 109b7dd..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng.schema;
-
-import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.vector.ValueVector;
-
-public interface Column {
-  TypeProtos.MajorType getMinorType();
-
-  void process(IEnhancedPacketBLock block, ValueVector vv, int count);
-}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
deleted file mode 100644
index 2023d19..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng.schema;
-
-import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class DummyArrayImpl implements Column {
-  @Override
-  public TypeProtos.MajorType getMinorType() {
-    return Types.repeated(TypeProtos.MinorType.INT);
-  }
-
-  @Override
-  public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
-  }
-}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
deleted file mode 100644
index a8c26a0..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng.schema;
-
-import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class DummyImpl implements Column {
-  @Override
-  public TypeProtos.MajorType getMinorType() {
-    return Types.optional(TypeProtos.MinorType.INT);
-  }
-
-  @Override
-  public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
-  }
-}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
deleted file mode 100644
index a9738bd..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng.schema;
-
-import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder;
-import org.apache.drill.exec.vector.ValueVector;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import static 
org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
-import static 
org.apache.drill.exec.store.pcapng.schema.Util.setNullableLongColumnValue;
-
-public class Schema {
-
-  private final static Map<String, Column> columns = new HashMap<>();
-
-  static {
-    columns.put("timestamp", new TimestampImpl());
-    columns.put("packet_length", new PacketLenImpl());
-    columns.put("type", new TypeImpl());
-    columns.put("src_ip", new SrcIpImpl());
-    columns.put("dst_ip", new DstIpImpl());
-    columns.put("src_port", new SrcPortImpl());
-    columns.put("dst_port", new DstPortImpl());
-    columns.put("src_mac_address", new SrcMacImpl());
-    columns.put("dst_mac_address", new DstMacImpl());
-    columns.put("tcp_session", new TcpSessionImpl());
-    columns.put("tcp_ack", new TcpAckImpl());
-    columns.put("tcp_flags", new TcpFlags());
-    columns.put("tcp_flags_ns", new TcpFlagsNsImpl());
-    columns.put("tcp_flags_cwr", new TcpFlagsCwrImpl());
-    columns.put("tcp_flags_ece", new TcpFlagsEceImpl());
-    columns.put("tcp_flags_ece_ecn_capable", new TcpFlagsEceEcnCapableImpl());
-    columns.put("tcp_flags_ece_congestion_experienced", new 
TcpFlagsEceCongestionExperiencedImpl());
-    columns.put("tcp_flags_urg", new TcpFlagsUrgIml());
-    columns.put("tcp_flags_ack", new TcpFlagsAckImpl());
-    columns.put("tcp_flags_psh", new TcpFlagsPshImpl());
-    columns.put("tcp_flags_rst", new TcpFlagsRstImpl());
-    columns.put("tcp_flags_syn", new TcpFlagsSynImpl());
-    columns.put("tcp_flags_fin", new TcpFlagsFinImpl());
-    columns.put("tcp_parsed_flags", new TcpParsedFlags());
-    columns.put("packet_data", new PacketDataImpl());
-  }
-
-  public static Map<String, Column> getColumns() {
-    return columns;
-  }
-
-  public static Set<String> getColumnsNames() {
-    return columns.keySet();
-  }
-
-  static class TimestampImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.required(TypeProtos.MinorType.TIMESTAMP);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      Util.setTimestampColumnValue(block.getTimeStamp(), vv, count);
-    }
-  }
-
-  static class PacketLenImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.required(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      Util.setIntegerColumnValue(block.getPacketLength(), vv, count);
-    }
-  }
-
-  static class TypeImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.VARCHAR);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableStringColumnValue(packet.getPacketType(), vv, count);
-      }
-    }
-  }
-
-  static class SrcIpImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.VARCHAR);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableStringColumnValue(packet.getSrc_ip().getHostAddress(), 
vv, count);
-      }
-    }
-  }
-
-  static class DstIpImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.VARCHAR);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableStringColumnValue(packet.getDst_ip().getHostAddress(), 
vv, count);
-      }
-    }
-  }
-
-  static class SrcPortImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableIntegerColumnValue(packet.getSrc_port(), vv, count);
-      }
-    }
-  }
-
-  static class DstPortImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableIntegerColumnValue(packet.getDst_port(), vv, count);
-      }
-    }
-  }
-
-  static class SrcMacImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.VARCHAR);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableStringColumnValue(packet.getEthernetSource(), vv, 
count);
-      }
-    }
-  }
-
-  static class DstMacImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.VARCHAR);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableStringColumnValue(packet.getEthernetDestination(), vv, 
count);
-      }
-    }
-  }
-
-  static class TcpSessionImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.BIGINT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        setNullableLongColumnValue(packet.getSessionHash(), vv, count);
-      }
-    }
-  }
-
-  static class TcpAckImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableIntegerColumnValue(packet.getAckNumber(), vv, count);
-      }
-    }
-  }
-
-  static class TcpFlags implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableIntegerColumnValue(packet.getFlags(), vv, count);
-      }
-    }
-  }
-
-  static class TcpFlagsNsImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x100) != 0, 
vv, count);
-      }
-    }
-  }
-
-  static class TcpFlagsCwrImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x80) != 0, 
vv, count);
-      }
-    }
-  }
-
-  static class TcpFlagsEceImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x40) != 0, 
vv, count);
-      }
-    }
-  }
-
-  static class TcpFlagsEceEcnCapableImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, 
vv, count);
-      }
-    }
-  }
-
-  static class TcpFlagsEceCongestionExperiencedImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, 
vv, count);
-      }
-    }
-  }
-
-  static class TcpFlagsUrgIml implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x20) != 0, 
vv, count);
-      }
-    }
-  }
-
-  static class TcpFlagsAckImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x10) != 0, 
vv, count);
-      }
-    }
-  }
-
-  static class TcpFlagsPshImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x8) != 0, vv, 
count);
-      }
-    }
-  }
-
-  static class TcpFlagsRstImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x4) != 0, vv, 
count);
-      }
-    }
-  }
-
-  static class TcpFlagsSynImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x2) != 0, vv, 
count);
-      }
-    }
-  }
-
-  static class TcpFlagsFinImpl implements Column {
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.INT);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableBooleanColumnValue((packet.getFlags() & 0x1) != 0, vv, 
count);
-      }
-    }
-  }
-
-  static class TcpParsedFlags implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.VARCHAR);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        Util.setNullableStringColumnValue(packet.getParsedFlags(), vv, count);
-      }
-    }
-  }
-
-  static class PacketDataImpl implements Column {
-    @Override
-    public TypeProtos.MajorType getMinorType() {
-      return Types.optional(TypeProtos.MinorType.VARCHAR);
-    }
-
-    @Override
-    public void process(IEnhancedPacketBLock block, ValueVector vv, int count) 
{
-      PacketDecoder packet = new PacketDecoder();
-      if (packet.readPcapng(block.getPacketData())) {
-        
Util.setNullableStringColumnValue(parseBytesToASCII(block.getPacketData()), vv, 
count);
-      }
-    }
-  }
-}
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
deleted file mode 100644
index 06e8e6a..0000000
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng.schema;
-
-import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.TimeStampVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-public class Util {
-  static void setNullableIntegerColumnValue(final int data, final ValueVector 
vv, final int count) {
-    ((NullableIntVector.Mutator) vv.getMutator())
-        .setSafe(count, data);
-  }
-
-  static void setIntegerColumnValue(final int data, final ValueVector vv, 
final int count) {
-    ((IntVector.Mutator) vv.getMutator())
-        .setSafe(count, data);
-  }
-
-  static void setTimestampColumnValue(final long data, final ValueVector vv, 
final int count) {
-    ((TimeStampVector.Mutator) vv.getMutator())
-        .setSafe(count, data / 1000);
-  }
-
-  static void setNullableLongColumnValue(final long data, final ValueVector 
vv, final int count) {
-    ((NullableBigIntVector.Mutator) vv.getMutator())
-        .setSafe(count, data);
-  }
-
-  static void setNullableStringColumnValue(final String data, final 
ValueVector vv, final int count) {
-    ((NullableVarCharVector.Mutator) vv.getMutator())
-        .setSafe(count, data.getBytes(UTF_8), 0, data.length());
-  }
-
-  static void setNullableBooleanColumnValue(final boolean data, final 
ValueVector vv, final int count) {
-    ((NullableIntVector.Mutator) vv.getMutator())
-        .setSafe(count, data ? 1 : 0);
-  }
-}
\ No newline at end of file
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index 871233b..92a91aab 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -58,8 +58,14 @@ public class TestFormatPluginOptionExtractor extends 
BaseTest {
           assertEquals(d.typeName, "(type: String, autoCorrectCorruptDates: 
boolean, enableStringsSignedMinMax: boolean)", d.presentParams());
           break;
         case "json":
+          assertEquals(d.typeName, "(type: String)", d.presentParams());
+          break;
         case "sequencefile":
+          assertEquals(d.typeName, "(type: String)", d.presentParams());
+          break;
         case "pcapng":
+          assertEquals(d.typeName, "(type: String, stat: boolean)", 
d.presentParams());
+          break;
         case "avro":
           assertEquals(d.typeName, "(type: String)", d.presentParams());
           break;
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
deleted file mode 100644
index 6228766..0000000
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng;
-
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.metadata.TupleSchema;
-import org.apache.drill.test.ClusterFixture;
-import org.apache.drill.test.ClusterTest;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.file.Paths;
-
-public class TestPcapngHeaders extends ClusterTest {
-  @BeforeClass
-  public static void setupTestFiles() throws Exception {
-    startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
-    dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
-  }
-
-  @Test
-  public void testValidHeadersForStarQuery() throws IOException {
-    String query = "select * from dfs.`store/pcapng/sniff.pcapng`";
-    RowSet actual = client.queryBuilder().sql(query).rowSet();
-
-    TupleMetadata expectedSchema = new TupleSchema();
-
-    expectedSchema.add(MaterializedField.create("tcp_flags_ece_ecn_capable", 
Types.optional(TypeProtos.MinorType.INT)));
-    
expectedSchema.add(MaterializedField.create("tcp_flags_ece_congestion_experienced",
 Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("tcp_flags_psh", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("type", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("tcp_flags_cwr", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("dst_ip", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("src_ip", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("tcp_flags_fin", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("tcp_flags_ece", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("tcp_flags", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("tcp_flags_ack", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("src_mac_address", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("tcp_flags_syn", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("tcp_flags_rst", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("timestamp", 
Types.required(TypeProtos.MinorType.TIMESTAMP)));
-    expectedSchema.add(MaterializedField.create("tcp_session", 
Types.optional(TypeProtos.MinorType.BIGINT)));
-    expectedSchema.add(MaterializedField.create("packet_data", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("tcp_parsed_flags", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("tcp_flags_ns", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("src_port", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("packet_length", 
Types.required(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("tcp_flags_urg", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("tcp_ack", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("dst_port", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("dst_mac_address", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-
-    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .build();
-    new RowSetComparison(expected)
-        .verifyAndClearAll(actual);
-  }
-
-  @Test
-  public void testValidHeadersForProjection() throws IOException {
-    String query = "select sRc_ip, dst_IP, dst_mAc_address, src_Port, 
tcp_session, `Timestamp`  from dfs.`store/pcapng/sniff.pcapng`";
-    RowSet actual = client.queryBuilder().sql(query).rowSet();
-
-    TupleMetadata expectedSchema = new TupleSchema();
-
-    expectedSchema.add(MaterializedField.create("sRc_ip", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("dst_IP", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("dst_mAc_address", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("src_Port", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("tcp_session", 
Types.optional(TypeProtos.MinorType.BIGINT)));
-    expectedSchema.add(MaterializedField.create("Timestamp", 
Types.required(TypeProtos.MinorType.TIMESTAMP)));
-
-    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .build();
-    new RowSetComparison(expected)
-        .verifyAndClearAll(actual);
-  }
-
-  @Test
-  public void testValidHeadersForMissColumns() throws IOException {
-    String query = "select `timestamp`, `name`, `color` from 
dfs.`store/pcapng/sniff.pcapng`";
-    RowSet actual = client.queryBuilder().sql(query).rowSet();
-
-    TupleMetadata expectedSchema = new TupleSchema();
-
-    expectedSchema.add(MaterializedField.create("timestamp", 
Types.required(TypeProtos.MinorType.TIMESTAMP)));
-    expectedSchema.add(MaterializedField.create("name", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("color", 
Types.optional(TypeProtos.MinorType.INT)));
-
-    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .build();
-    new RowSetComparison(expected)
-        .verifyAndClearAll(actual);
-  }
-
-  @Test
-  public void testMixColumns() throws IOException {
-    String query = "select src_ip, dst_ip, dst_mac_address, src_port, 
tcp_session, `timestamp`  from dfs.`store/pcapng/sniff.pcapng`";
-    RowSet actual = client.queryBuilder().sql(query).rowSet();
-
-    TupleMetadata expectedSchema = new TupleSchema();
-
-    expectedSchema.add(MaterializedField.create("sRc_ip", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("dst_IP", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("dst_mAc_address", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("src_Port", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("tcp_session", 
Types.optional(TypeProtos.MinorType.BIGINT)));
-    expectedSchema.add(MaterializedField.create("Timestamp", 
Types.required(TypeProtos.MinorType.TIMESTAMP)));
-
-    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .build();
-    new RowSetComparison(expected)
-        .verifyAndClearAll(actual);
-
-    String queryWithDiffOrder = "select `timestamp`, src_ip, dst_ip, src_port, 
tcp_session, dst_mac_address from dfs.`store/pcapng/sniff.pcapng`";
-    actual = client.queryBuilder().sql(queryWithDiffOrder).rowSet();
-
-    expectedSchema = new TupleSchema();
-
-    expectedSchema.add(MaterializedField.create("timestamp", 
Types.required(TypeProtos.MinorType.TIMESTAMP)));
-    expectedSchema.add(MaterializedField.create("src_ip", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("dst_ip", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-    expectedSchema.add(MaterializedField.create("src_port", 
Types.optional(TypeProtos.MinorType.INT)));
-    expectedSchema.add(MaterializedField.create("tcp_session", 
Types.optional(TypeProtos.MinorType.BIGINT)));
-    expectedSchema.add(MaterializedField.create("dst_mac_address", 
Types.optional(TypeProtos.MinorType.VARCHAR)));
-
-    expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .build();
-    new RowSetComparison(expected)
-        .verifyAndClearAll(actual);
-  }
-
-  @Test
-  public void testValidHeaderForArrayColumns() throws IOException {
-    // query with non-existent field
-    String query = "select arr[3] as arr from dfs.`store/pcapng/sniff.pcapng`";
-    RowSet actual = client.queryBuilder().sql(query).rowSet();
-
-    TupleMetadata expectedSchema = new TupleSchema();
-
-    expectedSchema.add(MaterializedField.create("arr", 
Types.optional(TypeProtos.MinorType.INT)));
-
-    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .build();
-    new RowSetComparison(expected)
-        .verifyAndClearAll(actual);
-
-    // query with an existent field which doesn't support arrays
-    query = "select type[45] as arr from dfs.`store/pcapng/sniff.pcapng`";
-
-    expectedSchema = new TupleSchema();
-    actual = client.queryBuilder().sql(query).rowSet();
-
-    expectedSchema.add(MaterializedField.create("arr", 
Types.optional(TypeProtos.MinorType.INT)));
-
-    expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .build();
-    new RowSetComparison(expected)
-        .verifyAndClearAll(actual);
-  }
-
-  @Test
-  public void testValidHeaderForNestedColumns() throws IOException {
-    // query with non-existent field
-    String query = "select top['nested'] as nested from 
dfs.`store/pcapng/sniff.pcapng`";
-    RowSet actual = client.queryBuilder().sql(query).rowSet();
-
-    TupleMetadata expectedSchema = new TupleSchema();
-
-    expectedSchema.add(MaterializedField.create("nested", 
Types.optional(TypeProtos.MinorType.INT)));
-
-    RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .build();
-    new RowSetComparison(expected)
-        .verifyAndClearAll(actual);
-
-    // query with an existent field which doesn't support nesting
-    query = "select type['nested'] as nested from 
dfs.`store/pcapng/sniff.pcapng`";
-
-    expectedSchema = new TupleSchema();
-    actual = client.queryBuilder().sql(query).rowSet();
-
-    expectedSchema.add(MaterializedField.create("nested", 
Types.optional(TypeProtos.MinorType.INT)));
-
-    expected = new RowSetBuilder(client.allocator(), expectedSchema)
-        .build();
-    new RowSetComparison(expected)
-        .verifyAndClearAll(actual);
-  }
-}
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
index 98d7b67..d0c4efc 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
@@ -17,84 +17,201 @@
  */
 package org.apache.drill.exec.store.pcapng;
 
-import org.apache.drill.PlanTestBase;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+
+import org.apache.drill.categories.RowSetTests;
 import org.apache.drill.common.exceptions.UserRemoteException;
-import org.junit.Assert;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.QueryTestUtil;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.joda.time.Instant;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
-import java.nio.file.Paths;
+@Category(RowSetTests.class)
+public class TestPcapngRecordReader extends ClusterTest {
 
-public class TestPcapngRecordReader extends PlanTestBase {
   @BeforeClass
-  public static void setupTestFiles() {
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
     dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
   }
 
   @Test
   public void testStarQuery() throws Exception {
-    Assert.assertEquals(123, testSql("select * from 
dfs.`store/pcapng/sniff.pcapng`"));
-    Assert.assertEquals(1, testSql("select * from 
dfs.`store/pcapng/example.pcapng`"));
+    String sql = "select * from dfs.`store/pcapng/sniff.pcapng`";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(123, sets.rowCount());
+    sets.clear();
+  }
+
+  @Test
+  public void testExplicitQuery() throws Exception {
+    String sql = "select type, packet_length, `timestamp` from 
dfs.`store/pcapng/sniff.pcapng` where type = 'ARP'";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("type", MinorType.VARCHAR)
+        .add("packet_length", MinorType.INT)
+        .add("timestamp", MinorType.TIMESTAMP)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow("ARP", 90, Instant.ofEpochMilli(1518010669927L))
+        .addRow("ARP", 90, Instant.ofEpochMilli(1518010671874L))
+        .build();
+
+    assertEquals(2, sets.rowCount());
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testLimitPushdown() throws Exception {
+    String sql = "select * from dfs.`store/pcapng/sniff.pcapng` where type = 
'UDP' limit 10 offset 65";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(6, sets.rowCount());
+    sets.clear();
   }
 
   @Test
-  public void testProjectingByName() throws Exception {
-    Assert.assertEquals(123, testSql("select `timestamp`, packet_data, type 
from dfs.`store/pcapng/sniff.pcapng`"));
-    Assert.assertEquals(1, testSql("select src_ip, dst_ip, `timestamp` from 
dfs.`store/pcapng/example.pcapng`"));
+  public void testSerDe() throws Exception {
+    String sql = "select count(*) from dfs.`store/pcapng/example.pcapng`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+
+    assertEquals("Counts should match", 1, cnt);
   }
 
   @Test
-  public void testDiffCaseQuery() throws Exception {
-    Assert.assertEquals(123, testSql("select `timestamp`, paCket_dAta, TyPe 
from dfs.`store/pcapng/sniff.pcapng`"));
-    Assert.assertEquals(1, testSql("select src_ip, dst_ip, `Timestamp` from 
dfs.`store/pcapng/example.pcapng`"));
+  public void testExplicitQueryWithCompressedFile() throws Exception {
+    QueryTestUtil.generateCompressedFile("store/pcapng/sniff.pcapng", "zip", 
"store/pcapng/sniff.pcapng.zip");
+    String sql = "select type, packet_length, `timestamp` from 
dfs.`store/pcapng/sniff.pcapng.zip` where type = 'ARP'";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("type", MinorType.VARCHAR)
+        .add("packet_length", MinorType.INT)
+        .add("timestamp", MinorType.TIMESTAMP)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow("ARP", 90, Instant.ofEpochMilli(1518010669927L))
+        .addRow("ARP", 90, Instant.ofEpochMilli(1518010671874L))
+        .build();
+
+    assertEquals(2, sets.rowCount());
+    new RowSetComparison(expected).verifyAndClearAll(sets);
   }
 
   @Test
-  public void testProjectingMissColls() throws Exception {
-    Assert.assertEquals(123, testSql("select `timestamp`, `name`, `color` from 
dfs.`store/pcapng/sniff.pcapng`"));
-    Assert.assertEquals(1, testSql("select src_ip, `time` from 
dfs.`store/pcapng/example.pcapng`"));
+  public void testCaseInsensitiveQuery() throws Exception {
+    String sql = "select `timestamp`, paCket_dAta, TyPe from 
dfs.`store/pcapng/sniff.pcapng`";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(123, sets.rowCount());
+    sets.clear();
   }
 
+  @Test
+  public void testWhereSyntaxQuery() throws Exception {
+    String sql = "select type, src_ip, dst_ip, packet_length from 
dfs.`store/pcapng/sniff.pcapng` where src_ip= '10.2.15.239'";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("type", MinorType.VARCHAR)
+        .addNullable("src_ip", MinorType.VARCHAR)
+        .addNullable("dst_ip", MinorType.VARCHAR)
+        .add("packet_length", MinorType.INT)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow("UDP", "10.2.15.239", "239.255.255.250", 214)
+        .addRow("UDP", "10.2.15.239", "239.255.255.250", 214)
+        .addRow("UDP", "10.2.15.239", "239.255.255.250", 214)
+        .build();
+
+    assertEquals(3, sets.rowCount());
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
 
   @Test
-  public void testCountQuery() throws Exception {
-    testBuilder()
-        .sqlQuery("select count(*) as ct from dfs.`store/pcapng/sniff.pcapng`")
-        .ordered()
-        .baselineColumns("ct")
-        .baselineValues(123L)
-        .build()
-        .run();
-
-    testBuilder()
-        .sqlQuery("select count(*) as ct from 
dfs.`store/pcapng/example.pcapng`")
-        .ordered()
-        .baselineColumns("ct")
-        .baselineValues(1L)
-        .build()
-        .run();
+  public void testValidHeaders() throws Exception {
+    String sql = "select * from dfs.`store/pcapng/sniff.pcapng`";
+    RowSet sets = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .add("timestamp", MinorType.TIMESTAMP)
+        .add("packet_length", MinorType.INT)
+        .addNullable("type", MinorType.VARCHAR)
+        .addNullable("src_ip", MinorType.VARCHAR)
+        .addNullable("dst_ip", MinorType.VARCHAR)
+        .addNullable("src_port", MinorType.INT)
+        .addNullable("dst_port", MinorType.INT)
+        .addNullable("src_mac_address", MinorType.VARCHAR)
+        .addNullable("dst_mac_address", MinorType.VARCHAR)
+        .addNullable("tcp_session", MinorType.BIGINT)
+        .addNullable("tcp_ack", MinorType.INT)
+        .addNullable("tcp_flags", MinorType.INT)
+        .addNullable("tcp_flags_ns", MinorType.INT)
+        .addNullable("tcp_flags_cwr", MinorType.INT)
+        .addNullable("tcp_flags_ece", MinorType.INT)
+        .addNullable("tcp_flags_ece_ecn_capable", MinorType.INT)
+        .addNullable("tcp_flags_ece_congestion_experienced", MinorType.INT)
+        .addNullable("tcp_flags_urg", MinorType.INT)
+        .addNullable("tcp_flags_ack", MinorType.INT)
+        .addNullable("tcp_flags_psh", MinorType.INT)
+        .addNullable("tcp_flags_rst", MinorType.INT)
+        .addNullable("tcp_flags_syn", MinorType.INT)
+        .addNullable("tcp_flags_fin", MinorType.INT)
+        .addNullable("tcp_parsed_flags", MinorType.VARCHAR)
+        .addNullable("packet_data", MinorType.VARCHAR)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema).build();
+    new RowSetComparison(expected).verifyAndClearAll(sets);
   }
 
   @Test
   public void testGroupBy() throws Exception {
-    Assert.assertEquals(47, testSql("select src_ip, count(1), 
sum(packet_length) from dfs.`store/pcapng/sniff.pcapng` group by src_ip"));
+    String sql = "select src_ip, count(1), sum(packet_length) from 
dfs.`store/pcapng/sniff.pcapng` group by src_ip";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(47, sets.rowCount());
+    sets.clear();
   }
 
   @Test
   public void testDistinctQuery() throws Exception {
-    Assert.assertEquals(119, testSql("select distinct `timestamp`, src_ip from 
dfs.`store/pcapng/sniff.pcapng`"));
-    Assert.assertEquals(1, testSql("select distinct packet_data from 
dfs.`store/pcapng/example.pcapng`"));
+    String sql = "select distinct `timestamp`, src_ip from 
dfs.`store/pcapng/sniff.pcapng`";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(119, sets.rowCount());
+    sets.clear();
   }
 
   @Test(expected = UserRemoteException.class)
   public void testBasicQueryWithIncorrectFileName() throws Exception {
-    testSql("select * from dfs.`store/pcapng/snaff.pcapng`");
-  }
-
-  @Test
-  public void testPhysicalPlanExecutionBasedOnQuery() throws Exception {
-    String query = "EXPLAIN PLAN for select * from 
dfs.`store/pcapng/sniff.pcapng`";
-    String plan = getPlanInString(query, JSON_FORMAT);
-    Assert.assertEquals(123, testPhysical(plan));
+    String sql = "select * from dfs.`store/pcapng/drill.pcapng`";
+    client.queryBuilder().sql(sql).rowSet();
   }
-}
+}
\ No newline at end of file
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
new file mode 100644
index 0000000..2dbdda7
--- /dev/null
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(RowSetTests.class)
+public class TestPcapngStatRecordReader extends ClusterTest {
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+    cluster.defineFormat("dfs", "pcapng", new PcapngFormatConfig(null, true));
+    dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
+  }
+
+  @Test
+  public void testStarQuery() throws Exception {
+    String sql = "select * from dfs.`store/pcapng/example.pcapng`";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(3, sets.rowCount());
+    sets.clear();
+  }
+
+  @Test
+  public void testExplicitQuery() throws Exception {
+    String sql = "select path, shb_hardware, shb_os, if_name, isb_ifrecv from 
dfs.`store/pcapng/sniff.pcapng`";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("path", MinorType.VARCHAR)
+        .addNullable("shb_hardware", MinorType.VARCHAR)
+        .addNullable("shb_os", MinorType.VARCHAR)
+        .addNullable("if_name", MinorType.VARCHAR)
+        .addNullable("isb_ifrecv", MinorType.BIGINT)
+        .buildSchema();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema)
+        .addRow("sniff.pcapng", "Intel(R) Core(TM) i7-6700HQ CPU @ 2.60GHz 
(with SSE4.2)",
+            "Mac OS X 10.13.3, build 17D47 (Darwin 17.4.0)", null, null)
+        .addRow("sniff.pcapng", null, null, "en0", null)
+        .addRow("sniff.pcapng", null, null, null, 123)
+        .build();
+
+    assertEquals(3, sets.rowCount());
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+
+  @Test
+  public void testLimitPushdown() throws Exception {
+    String sql = "select * from dfs.`store/pcapng/example.pcapng` limit 2";
+    QueryBuilder builder = client.queryBuilder().sql(sql);
+    RowSet sets = builder.rowSet();
+
+    assertEquals(2, sets.rowCount());
+    sets.clear();
+  }
+
+  @Test
+  public void testSerDe() throws Exception {
+    String sql = "select count(*) from dfs.`store/pcapng/*.pcapng`";
+    String plan = queryBuilder().sql(sql).explainJson();
+    long cnt = queryBuilder().physical(plan).singletonLong();
+
+    assertEquals("Counts should match", 6, cnt);
+  }
+
+  @Test
+  public void testValidHeaders() throws Exception {
+    String sql = "select * from dfs.`store/pcapng/sniff.pcapng`";
+    RowSet sets = client.queryBuilder().sql(sql).rowSet();
+
+    TupleMetadata schema = new SchemaBuilder()
+        .addNullable("path", MinorType.VARCHAR)
+        .addNullable("shb_hardware", MinorType.VARCHAR)
+        .addNullable("shb_os", MinorType.VARCHAR)
+        .addNullable("shb_userappl", MinorType.VARCHAR)
+        .addNullable("if_name", MinorType.VARCHAR)
+        .addNullable("if_description", MinorType.VARCHAR)
+        .addNullable("if_ipv4addr", MinorType.VARCHAR)
+        .addNullable("if_ipv6addr", MinorType.VARCHAR)
+        .addNullable("if_macaddr", MinorType.VARCHAR)
+        .addNullable("if_euiaddr", MinorType.VARCHAR)
+        .addNullable("if_speed", MinorType.INT)
+        .addNullable("if_tsresol", MinorType.INT)
+        .addNullable("if_tzone", MinorType.INT)
+        .addNullable("if_os", MinorType.VARCHAR)
+        .addNullable("if_fcslen", MinorType.INT)
+        .addNullable("if_tsoffset", MinorType.INT)
+        .addNullable("ns_dnsname", MinorType.VARCHAR)
+        .addNullable("ns_dnsip4addr", MinorType.VARCHAR)
+        .addNullable("ns_dnsip6addr", MinorType.VARCHAR)
+        .addNullable("isb_starttime", MinorType.TIMESTAMP)
+        .addNullable("isb_endtime", MinorType.TIMESTAMP)
+        .addNullable("isb_ifrecv", MinorType.BIGINT)
+        .addNullable("isb_ifdrop", MinorType.BIGINT)
+        .addNullable("isb_filteraccept", MinorType.BIGINT)
+        .addNullable("isb_osdrop", MinorType.BIGINT)
+        .addNullable("isb_usrdeliv", MinorType.BIGINT)
+        .build();
+
+    RowSet expected = new RowSetBuilder(client.allocator(), schema).build();
+    new RowSetComparison(expected).verifyAndClearAll(sets);
+  }
+}
\ No newline at end of file

Reply via email to