This is an automated email from the ASF dual-hosted git repository.
cgivre pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git
The following commit(s) were added to refs/heads/master by this push:
new 29716db DRILL-7533: Convert Pcapng to EVF
29716db is described below
commit 29716db80cee6dc82a7045a020b0c07053a61bd8
Author: luoc <[email protected]>
AuthorDate: Wed Dec 23 17:11:34 2020 +0800
DRILL-7533: Convert Pcapng to EVF
---
.../apache/drill/exec/store/pcapng/PcapColumn.java | 1020 ++++++++++++++++++++
.../drill/exec/store/pcapng/PcapngBatchReader.java | 275 ++++++
.../exec/store/pcapng/PcapngFormatConfig.java | 38 +-
.../exec/store/pcapng/PcapngFormatPlugin.java | 103 +-
.../exec/store/pcapng/PcapngRecordReader.java | 214 ----
.../drill/exec/store/pcapng/schema/Column.java | 28 -
.../exec/store/pcapng/schema/DummyArrayImpl.java | 34 -
.../drill/exec/store/pcapng/schema/DummyImpl.java | 34 -
.../drill/exec/store/pcapng/schema/Schema.java | 441 ---------
.../drill/exec/store/pcapng/schema/Util.java | 59 --
.../store/dfs/TestFormatPluginOptionExtractor.java | 6 +
.../drill/exec/store/pcapng/TestPcapngHeaders.java | 213 ----
.../exec/store/pcapng/TestPcapngRecordReader.java | 205 +++-
.../store/pcapng/TestPcapngStatRecordReader.java | 139 +++
14 files changed, 1678 insertions(+), 1131 deletions(-)
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapColumn.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapColumn.java
new file mode 100644
index 0000000..ec9d5f5
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapColumn.java
@@ -0,0 +1,1020 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.drill.common.types.TypeProtos.MajorType;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import org.apache.drill.exec.store.pcap.PcapFormatUtils;
+import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.joda.time.Instant;
+
+import fr.bmartel.pcapdecoder.structure.options.inter.IOptionsStatisticsHeader;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IDescriptionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+import fr.bmartel.pcapdecoder.structure.types.inter.INameResolutionBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.ISectionHeaderBlock;
+import fr.bmartel.pcapdecoder.structure.types.inter.IStatisticsBlock;
+
+public abstract class PcapColumn {
+
+ private static final Map<String, PcapColumn> columns = new LinkedHashMap<>();
+ private static final Map<String, PcapColumn> summary_columns = new
LinkedHashMap<>();
+ public static final String DUMMY_NAME = "dummy";
+ public static final String PATH_NAME = "path";
+
+ static {
+ // Basic
+ columns.put("timestamp", new PcapTimestamp());
+ columns.put("packet_length", new PcapPacketLength());
+ columns.put("type", new PcapType());
+ columns.put("src_ip", new PcapSrcIp());
+ columns.put("dst_ip", new PcapDstIp());
+ columns.put("src_port", new PcapSrcPort());
+ columns.put("dst_port", new PcapDstPort());
+ columns.put("src_mac_address", new PcapSrcMac());
+ columns.put("dst_mac_address", new PcapDstMac());
+ columns.put("tcp_session", new PcapTcpSession());
+ columns.put("tcp_ack", new PcapTcpAck());
+ columns.put("tcp_flags", new PcapTcpFlags());
+ columns.put("tcp_flags_ns", new PcapTcpFlagsNs());
+ columns.put("tcp_flags_cwr", new PcapTcpFlagsCwr());
+ columns.put("tcp_flags_ece", new PcapTcpFlagsEce());
+ columns.put("tcp_flags_ece_ecn_capable", new PcapTcpFlagsEceEcnCapable());
+ columns.put("tcp_flags_ece_congestion_experienced", new
PcapTcpFlagsEceCongestionExperienced());
+ columns.put("tcp_flags_urg", new PcapTcpFlagsUrg());
+ columns.put("tcp_flags_ack", new PcapTcpFlagsAck());
+ columns.put("tcp_flags_psh", new PcapTcpFlagsPsh());
+ columns.put("tcp_flags_rst", new PcapTcpFlagsRst());
+ columns.put("tcp_flags_syn", new PcapTcpFlagsSyn());
+ columns.put("tcp_flags_fin", new PcapTcpFlagsFin());
+ columns.put("tcp_parsed_flags", new PcapTcpParsedFlags());
+ columns.put("packet_data", new PcapPacketData());
+
+ // Extensions
+ summary_columns.put("path", new PcapStatPath());
+ // Section Header Block
+ summary_columns.put("shb_hardware", new PcapHardware());
+ summary_columns.put("shb_os", new PcapOS());
+ summary_columns.put("shb_userappl", new PcapUserAppl());
+ // Interface Description Block
+ summary_columns.put("if_name", new PcapIfName());
+ summary_columns.put("if_description", new PcapIfDescription());
+ summary_columns.put("if_ipv4addr", new PcapIfIPv4addr());
+ summary_columns.put("if_ipv6addr", new PcapIfIPv6addr());
+ summary_columns.put("if_macaddr", new PcapIfMACaddr());
+ summary_columns.put("if_euiaddr", new PcapIfEUIaddr());
+ summary_columns.put("if_speed", new PcapIfSpeed());
+ summary_columns.put("if_tsresol", new PcapIfTsresol());
+ summary_columns.put("if_tzone", new PcapIfTzone());
+ summary_columns.put("if_os", new PcapIfOS());
+ summary_columns.put("if_fcslen", new PcapIfFcslen());
+ summary_columns.put("if_tsoffset", new PcapIfTsOffset());
+ // Name Resolution Block
+ summary_columns.put("ns_dnsname", new PcapDnsName());
+ summary_columns.put("ns_dnsip4addr", new PcapDnsIP4addr());
+ summary_columns.put("ns_dnsip6addr", new PcapDnsIP6addr());
+ // Interface Statistics Block
+ summary_columns.put("isb_starttime", new PcapIsbStarttime());
+ summary_columns.put("isb_endtime", new PcapIsbEndtime());
+ summary_columns.put("isb_ifrecv", new PcapIsbIfrecv());
+ summary_columns.put("isb_ifdrop", new PcapIsbIfdrop());
+ summary_columns.put("isb_filteraccept", new PcapIsbFilterAccept());
+ summary_columns.put("isb_osdrop", new PcapIsbOSdrop());
+ summary_columns.put("isb_usrdeliv", new PcapIsbUsrdeliv());
+ }
+
+ abstract MajorType getType();
+
+ abstract void process(IPcapngType block, ScalarWriter writer);
+
+ public static Map<String, PcapColumn> getColumns() {
+ return Collections.unmodifiableMap(columns);
+ }
+
+ public static Map<String, PcapColumn> getSummaryColumns() {
+ return Collections.unmodifiableMap(summary_columns);
+ }
+
+ static class PcapDummy extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) { }
+ }
+
+ static class PcapStatPath extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) { }
+ }
+
+ static class PcapTimestamp extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.required(MinorType.TIMESTAMP);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ writer.setTimestamp(Instant.ofEpochMilli(((IEnhancedPacketBLock)
block).getTimeStamp() / 1000));
+ }
+ }
+
+ static class PcapPacketLength extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.required(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ writer.setInt(((IEnhancedPacketBLock) block).getPacketLength());
+ }
+ }
+
+ static class PcapType extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setString(packet.getPacketType());
+ }
+ }
+ }
+
+ static class PcapSrcIp extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setString(packet.getSrc_ip().getHostAddress());
+ }
+ }
+ }
+
+ static class PcapDstIp extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setString(packet.getDst_ip().getHostAddress());
+ }
+ }
+ }
+
+ static class PcapSrcPort extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setInt(packet.getSrc_port());
+ }
+ }
+ }
+
+ static class PcapDstPort extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setInt(packet.getDst_port());
+ }
+ }
+ }
+
+ static class PcapSrcMac extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setString(packet.getEthernetSource());
+ }
+ }
+ }
+
+ static class PcapDstMac extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setString(packet.getEthernetDestination());
+ }
+ }
+ }
+
+ static class PcapTcpSession extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.BIGINT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setLong(packet.getSessionHash());
+ }
+ }
+ }
+
+ static class PcapTcpAck extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setInt(packet.getAckNumber());
+ }
+ }
+ }
+
+ static class PcapTcpFlags extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setInt(packet.getFlags());
+ }
+ }
+ }
+
+ static class PcapTcpFlagsNs extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x100) != 0);
+ }
+ }
+ }
+
+ static class PcapTcpFlagsCwr extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x80) != 0);
+ }
+ }
+ }
+
+ static class PcapTcpFlagsEce extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x40) != 0);
+ }
+ }
+ }
+
+ static class PcapTcpFlagsEceEcnCapable extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x42) == 0x42);
+ }
+ }
+ }
+
+ static class PcapTcpFlagsEceCongestionExperienced extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x42) == 0x40);
+ }
+ }
+ }
+
+ static class PcapTcpFlagsUrg extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x20) != 0);
+ }
+ }
+ }
+
+ static class PcapTcpFlagsAck extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x10) != 0);
+ }
+ }
+ }
+
+ static class PcapTcpFlagsPsh extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x8) != 0);
+ }
+ }
+ }
+
+ static class PcapTcpFlagsRst extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x4) != 0);
+ }
+ }
+ }
+
+ static class PcapTcpFlagsSyn extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x2) != 0);
+ }
+ }
+ }
+
+ static class PcapTcpFlagsFin extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setBoolean((packet.getFlags() & 0x1) != 0);
+ }
+ }
+ }
+
+ static class PcapTcpParsedFlags extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+ writer.setString(packet.getParsedFlags());
+ }
+ }
+ }
+
+ static class PcapPacketData extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ PacketDecoder packet = new PacketDecoder();
+ if (packet.readPcapng(((IEnhancedPacketBLock) block).getPacketData())) {
+
writer.setString(PcapFormatUtils.parseBytesToASCII(((IEnhancedPacketBLock)
block).getPacketData()));
+ }
+ }
+ }
+
+ /**
+ * shb_hardware: description of the hardware
+ */
+ static class PcapHardware extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof ISectionHeaderBlock)) {
+ return;
+ }
+ writer.setString(((ISectionHeaderBlock)
block).getOptions().getHardware());
+ }
+ }
+
+ // Section Header Block
+
+ /**
+ * shb_os: name of the OS
+ */
+ static class PcapOS extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof ISectionHeaderBlock)) {
+ return;
+ }
+ writer.setString(((ISectionHeaderBlock) block).getOptions().getOS());
+ }
+ }
+
+ /**
+ * shb_userappl: name of the user application
+ */
+ static class PcapUserAppl extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof ISectionHeaderBlock)) {
+ return;
+ }
+ writer.setString(((ISectionHeaderBlock)
block).getOptions().getUserAppl());
+ }
+ }
+
+ // Interface Description Block
+
+ /**
+ * if_name: name of the device used to capture
+ */
+ static class PcapIfName extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setString(((IDescriptionBlock)
block).getOptions().getInterfaceName());
+ }
+ }
+
+ /**
+ * if_description: Description of the device used to capture the data
+ */
+ static class PcapIfDescription extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setString(((IDescriptionBlock)
block).getOptions().getInterfaceDescription());
+ }
+ }
+
+ /**
+ * if_IPv4addr: IPV4 address
+ */
+ static class PcapIfIPv4addr extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setString(((IDescriptionBlock)
block).getOptions().getInterfaceIpv4NetworkAddr());
+ }
+ }
+
+ /**
+ * if_IPv6addr: IPV6 address
+ */
+ static class PcapIfIPv6addr extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setString(((IDescriptionBlock)
block).getOptions().getIpv6NetworkAddr());
+ }
+ }
+
+ /**
+ * if_MACaddr: MAC address
+ */
+ static class PcapIfMACaddr extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setString(((IDescriptionBlock)
block).getOptions().getInterfaceMacAddr());
+ }
+ }
+
+ /**
+ * if_EUIaddr: EUI address
+ */
+ static class PcapIfEUIaddr extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setString(((IDescriptionBlock)
block).getOptions().getInterfaceEuiAddr());
+ }
+ }
+
+ /**
+ * if_speed: interface speed in bps
+ */
+ static class PcapIfSpeed extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setInt(((IDescriptionBlock)
block).getOptions().getInterfaceSpeed());
+ }
+ }
+
+ /**
+ * if_tsresol: Resolution of timestamp (6 means microsecond resolution for
instance)
+ */
+ static class PcapIfTsresol extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setInt(((IDescriptionBlock)
block).getOptions().getTimeStampResolution());
+ }
+ }
+
+ /**
+ * if_tzone: indicate Time zone => offset from UTC time
+ */
+ static class PcapIfTzone extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setInt(((IDescriptionBlock) block).getOptions().getTimeBias());
+ }
+ }
+
+ /**
+ * if_os: Name of the operating system
+ */
+ static class PcapIfOS extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setString(((IDescriptionBlock)
block).getOptions().getInterfaceOperatingSystem());
+ }
+ }
+
+ /**
+ * if_fcslen: Length of the Frame Check Sequence (in bits)
+ */
+ static class PcapIfFcslen extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setInt(((IDescriptionBlock)
block).getOptions().getInterfaceFrameCheckSequenceLength());
+ }
+ }
+
+ /**
+ * if_tsoffset: Timestamp offset for each packet / if not present timestamp
are absolute
+ */
+ static class PcapIfTsOffset extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.INT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IDescriptionBlock)) {
+ return;
+ }
+ writer.setInt(((IDescriptionBlock)
block).getOptions().getTimeStampOffset());
+ }
+ }
+
+ // Name Resolution Block
+
+ /**
+ * ns_dnsname: Retrieve DNS server name
+ */
+ static class PcapDnsName extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof INameResolutionBlock)) {
+ return;
+ }
+ writer.setString(((INameResolutionBlock)
block).getOptions().getDnsName());
+ }
+ }
+
+ /**
+ * ns_dnsIP4addr: Retrieve DNS IPV4 server address
+ */
+ static class PcapDnsIP4addr extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof INameResolutionBlock)) {
+ return;
+ }
+ writer.setString(((INameResolutionBlock)
block).getOptions().getDnsIpv4Addr());
+ }
+ }
+
+ /**
+ * ns_dnsIP6addr: Retrieve DNS IPV6 server address
+ */
+ static class PcapDnsIP6addr extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.VARCHAR);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof INameResolutionBlock)) {
+ return;
+ }
+ writer.setString(((INameResolutionBlock)
block).getOptions().getDnsIpv6Addr());
+ }
+ }
+
+ // Interface Statistics Block
+
+ /**
+ * isb_starttime: capture start time (timestamp resolution is defined in
Interface description header check exemple)
+ */
+ static class PcapIsbStarttime extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.TIMESTAMP);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IStatisticsBlock)) {
+ return;
+ }
+ IOptionsStatisticsHeader statisticsHeader = ((IStatisticsBlock)
block).getOptions();
+
writer.setTimestamp(Instant.ofEpochMilli(statisticsHeader.getCaptureStartTime()
/ 1000));
+ }
+ }
+
+ /**
+ * isb_endtime: capture end time (timestamp resolution is defined in
Interface description header check example)
+ */
+ static class PcapIsbEndtime extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.TIMESTAMP);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IStatisticsBlock)) {
+ return;
+ }
+ IOptionsStatisticsHeader statisticsHeader = ((IStatisticsBlock)
block).getOptions();
+
writer.setTimestamp(Instant.ofEpochMilli(statisticsHeader.getCaptureEndTime() /
1000));
+ }
+ }
+
+ /**
+ * isb_ifrecv: packet received count
+ */
+ static class PcapIsbIfrecv extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.BIGINT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IStatisticsBlock)) {
+ return;
+ }
+ writer.setLong(((IStatisticsBlock)
block).getOptions().getPacketReceivedCount());
+ }
+ }
+
+ /**
+ * isb_ifdrop: packet drop count
+ */
+ static class PcapIsbIfdrop extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.BIGINT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IStatisticsBlock)) {
+ return;
+ }
+ writer.setLong(((IStatisticsBlock)
block).getOptions().getPacketDropCount());
+ }
+ }
+
+ /**
+ * isb_filteraccept: packet accepted by filter count
+ */
+ static class PcapIsbFilterAccept extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.BIGINT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IStatisticsBlock)) {
+ return;
+ }
+ writer.setLong(((IStatisticsBlock)
block).getOptions().getPacketAcceptedByFilterCount());
+ }
+ }
+
+ /**
+ * isb_osdrop: packet dropped by Operating system count
+ */
+ static class PcapIsbOSdrop extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.BIGINT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IStatisticsBlock)) {
+ return;
+ }
+ writer.setLong(((IStatisticsBlock)
block).getOptions().getPacketDroppedByOS());
+ }
+ }
+
+ /**
+ * isb_usrdeliv: packet deliver to use count
+ */
+ static class PcapIsbUsrdeliv extends PcapColumn {
+
+ @Override
+ MajorType getType() {
+ return Types.optional(MinorType.BIGINT);
+ }
+
+ @Override
+ void process(IPcapngType block, ScalarWriter writer) {
+ if (!(block instanceof IStatisticsBlock)) {
+ return;
+ }
+ writer.setLong(((IStatisticsBlock)
block).getOptions().getPacketDeliveredToUser());
+ }
+ }
+}
\ No newline at end of file
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
new file mode 100644
index 0000000..eeabebf
--- /dev/null
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngBatchReader.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.CustomErrorContext;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.physical.resultSet.ResultSetLoader;
+import org.apache.drill.exec.physical.resultSet.RowSetLoader;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
+import org.apache.drill.exec.util.Utilities;
+import org.apache.drill.exec.vector.accessor.ScalarWriter;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import fr.bmartel.pcapdecoder.PcapDecoder;
+import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
+import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
+
+public class PcapngBatchReader implements ManagedReader<FileSchemaNegotiator> {
+
+ private static final Logger logger =
LoggerFactory.getLogger(PcapngBatchReader.class);
+
+ private final PcapngFormatConfig config;
+ private final EasySubScan scan;
+ private final int maxRecords;
+ private CustomErrorContext errorContext;
+ private List<SchemaPath> columns;
+ private List<ColumnDefn> projectedColumns;
+ private Iterator<IPcapngType> pcapIterator;
+ private IPcapngType block;
+ private RowSetLoader loader;
+ private InputStream in;
+ private Path path;
+
+ public PcapngBatchReader(final PcapngFormatConfig config, final EasySubScan
scan) {
+ this.config = config;
+ this.scan = scan;
+ this.maxRecords = scan.getMaxRecords();
+ this.columns = scan.getColumns();
+ }
+
+ @Override
+ public boolean open(FileSchemaNegotiator negotiator) {
+ try {
+ // init InputStream for pcap file
+ errorContext = negotiator.parentErrorContext();
+ DrillFileSystem dfs = negotiator.fileSystem();
+ path = dfs.makeQualified(negotiator.split().getPath());
+ in = dfs.openPossiblyCompressedStream(path);
+ // decode the pcap file
+ PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
+ decoder.decode();
+ pcapIterator = decoder.getSectionList().iterator();
+ logger.debug("The config is {}, root is {}, columns has {}", config,
scan.getSelectionRoot(), columns);
+ } catch (IOException e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failure in initial pcapng inputstream. " +
e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ } catch (Exception e) {
+ throw UserException
+ .dataReadError(e)
+ .message("Failed to decode the pcapng file. " + e.getMessage())
+ .addContext(errorContext)
+ .build(logger);
+ }
+ // define the schema
+ negotiator.tableSchema(defineMetadata(), true);
+ ResultSetLoader resultSetLoader = negotiator.build();
+ loader = resultSetLoader.writer();
+ // bind the writer for columns
+ bindColumns(loader);
+ return true;
+ }
+
+ /**
+ * The default of the `stat` parameter is false,
+ * which means that the packet data is parsed and returned,
+ * but if true, will return the statistics data about the each pcapng file
only
+ * (consist of the information about collect devices and the summary of the
packet data above).
+ *
+ * In addition, a pcapng file contains a single Section Header Block (SHB),
+ * a single Interface Description Block (IDB) and a few Enhanced Packet
Blocks (EPB).
+ * <pre>
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * | SHB | IDB | EPB | EPB | ... | EPB |
+ * +-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+-+
+ * </pre>
+ *
https://pcapng.github.io/pcapng/draft-tuexen-opsawg-pcapng.html#name-physical-file-layout
+ */
+ @Override
+ public boolean next() {
+ while (!loader.isFull()) {
+ if (!pcapIterator.hasNext()) {
+ return false;
+ } else if (config.getStat() && isIEnhancedPacketBlock()) {
+ continue;
+ } else if (!config.getStat() && !isIEnhancedPacketBlock()) {
+ continue;
+ }
+ processBlock();
+ if (loader.limitReached(maxRecords)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public void close() {
+ AutoCloseables.closeSilently(in);
+ }
+
+ private boolean isIEnhancedPacketBlock() {
+ block = pcapIterator.next();
+ return block instanceof IEnhancedPacketBLock;
+ }
+
+ private void processBlock() {
+ loader.start();
+ for (ColumnDefn columnDefn : projectedColumns) {
+ // pcapng file name
+ if (columnDefn.getName().equals(PcapColumn.PATH_NAME)) {
+ columnDefn.load(path.getName());
+ } else {
+ // pcapng block data
+ columnDefn.load(block);
+ }
+ }
+ loader.save();
+ }
+
+ private boolean isSkipQuery() {
+ return columns.isEmpty();
+ }
+
+ private boolean isStarQuery() {
+ return Utilities.isStarQuery(columns);
+ }
+
+ private TupleMetadata defineMetadata() {
+ SchemaBuilder builder = new SchemaBuilder();
+ processProjected(columns);
+ for (ColumnDefn columnDefn : projectedColumns) {
+ columnDefn.define(builder);
+ }
+ return builder.buildSchema();
+ }
+
+ /**
+ * <b> Define the schema based on projected </b><br/>
+ * 1. SkipQuery: no field specified, such as count(*) <br/>
+ * 2. StarQuery: select * <br/>
+ * 3. ProjectPushdownQuery: select a,b,c <br/>
+ */
+ private void processProjected(List<SchemaPath> columns) {
+ projectedColumns = new ArrayList<ColumnDefn>();
+ if (isSkipQuery()) {
+ projectedColumns.add(new ColumnDefn(PcapColumn.DUMMY_NAME, new
PcapColumn.PcapDummy()));
+ } else if (isStarQuery()) {
+ Set<Map.Entry<String, PcapColumn>> pcapColumns;
+ if (config.getStat()) {
+ pcapColumns = PcapColumn.getSummaryColumns().entrySet();
+ } else {
+ pcapColumns = PcapColumn.getColumns().entrySet();
+ }
+ for (Map.Entry<String, PcapColumn> pcapColumn : pcapColumns) {
+ makePcapColumns(projectedColumns, pcapColumn.getKey(),
pcapColumn.getValue());
+ }
+ } else {
+ for (SchemaPath schemaPath : columns) {
+ // Support Case-Insensitive
+ String projectedName = schemaPath.rootName().toLowerCase();
+ PcapColumn pcapColumn;
+ if (config.getStat()) {
+ pcapColumn = PcapColumn.getSummaryColumns().get(projectedName);
+ } else {
+ pcapColumn = PcapColumn.getColumns().get(projectedName);
+ }
+ if (pcapColumn != null) {
+ makePcapColumns(projectedColumns, projectedName, pcapColumn);
+ } else {
+ makePcapColumns(projectedColumns, projectedName, new
PcapColumn.PcapDummy());
+ logger.debug("{} missing the PcapColumn implement class.",
projectedName);
+ }
+ }
+ }
+ Collections.unmodifiableList(projectedColumns);
+ }
+
+ private void makePcapColumns(List<ColumnDefn> projectedColumns, String name,
PcapColumn column) {
+ projectedColumns.add(new ColumnDefn(name, column));
+ }
+
+ private void bindColumns(RowSetLoader loader) {
+ for (ColumnDefn columnDefn : projectedColumns) {
+ columnDefn.bind(loader);
+ }
+ }
+
+ private static class ColumnDefn {
+
+ private final String name;
+ private PcapColumn processor;
+ private ScalarWriter writer;
+
+ public ColumnDefn(String name, PcapColumn column) {
+ this.name = name;
+ this.processor = column;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public PcapColumn getProcessor() {
+ return processor;
+ }
+
+ public void bind(RowSetLoader loader) {
+ writer = loader.scalar(getName());
+ }
+
+ public void define(SchemaBuilder builder) {
+ if (getProcessor().getType().getMode() == DataMode.REQUIRED) {
+ builder.add(getName(), getProcessor().getType().getMinorType());
+ } else {
+ builder.addNullable(getName(),
getProcessor().getType().getMinorType());
+ }
+ }
+
+ public void load(IPcapngType block) {
+ getProcessor().process(block, writer);
+ }
+
+ public void load(String value) {
+ writer.setString(value);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
index 8ded7ad..7210f93 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatConfig.java
@@ -17,24 +17,42 @@
*/
package org.apache.drill.exec.store.pcapng;
-import com.fasterxml.jackson.annotation.JsonTypeName;
+import java.util.List;
+import java.util.Objects;
import org.apache.drill.common.PlanStringBuilder;
import org.apache.drill.common.logical.FormatPluginConfig;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Objects;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonTypeName;
-@JsonTypeName("pcapng")
+@JsonTypeName(PcapngFormatConfig.NAME)
+@JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class PcapngFormatConfig implements FormatPluginConfig {
- public List<String> extensions = Collections.singletonList("pcapng");
+ public static final String NAME = "pcapng";
+ private final List<String> extensions;
+ private final boolean stat;
+
+ @JsonCreator
+ public PcapngFormatConfig(@JsonProperty("extensions") List<String>
extensions, @JsonProperty("stat") boolean stat) {
+ this.extensions = extensions == null ?
ImmutableList.of(PcapngFormatConfig.NAME) : ImmutableList.copyOf(extensions);
+ this.stat = stat;
+ }
+ @JsonProperty("extensions")
public List<String> getExtensions() {
return extensions;
}
+ @JsonProperty("stat")
+ public boolean getStat() {
+ return this.stat;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -44,18 +62,16 @@ public class PcapngFormatConfig implements
FormatPluginConfig {
return false;
}
PcapngFormatConfig that = (PcapngFormatConfig) o;
- return Objects.equals(extensions, that.extensions);
+ return Objects.equals(extensions, that.extensions) && Objects.equals(stat,
that.getStat());
}
@Override
public int hashCode() {
- return Objects.hash(extensions);
+ return Objects.hash(extensions, stat);
}
@Override
public String toString() {
- return new PlanStringBuilder(this)
- .field("extensions", extensions)
- .toString();
+ return new PlanStringBuilder(this).field("extensions",
extensions).field("stat", stat).toString();
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
index 41be760..d80e8ac 100644
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
+++
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngFormatPlugin.java
@@ -17,79 +17,76 @@
*/
package org.apache.drill.exec.store.pcapng;
-import java.io.IOException;
-import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.logical.StoragePluginConfig;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.planner.common.DrillStatsTable;
-import org.apache.drill.exec.proto.UserBitShared;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.common.types.Types;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileReaderFactory;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileScanBuilder;
+import
org.apache.drill.exec.physical.impl.scan.file.FileScanFramework.FileSchemaNegotiator;
+import org.apache.drill.exec.physical.impl.scan.framework.ManagedReader;
+import org.apache.drill.exec.proto.UserBitShared.CoreOperatorType;
import org.apache.drill.exec.server.DrillbitContext;
-import org.apache.drill.exec.store.RecordReader;
-import org.apache.drill.exec.store.RecordWriter;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.dfs.easy.EasyFormatPlugin;
-import org.apache.drill.exec.store.dfs.easy.EasyWriter;
-import org.apache.drill.exec.store.dfs.easy.FileWork;
+import org.apache.drill.exec.store.dfs.easy.EasySubScan;
import org.apache.hadoop.conf.Configuration;
-import java.util.List;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
public class PcapngFormatPlugin extends EasyFormatPlugin<PcapngFormatConfig> {
- public static final String DEFAULT_NAME = "pcapng";
-
- public PcapngFormatPlugin(String name, DrillbitContext context,
Configuration fsConf,
- StoragePluginConfig storagePluginConfig) {
- this(name, context, fsConf, storagePluginConfig, new PcapngFormatConfig());
+ public PcapngFormatPlugin(String name,
+ DrillbitContext context,
+ Configuration fsConf,
+ StoragePluginConfig storageConfig,
+ PcapngFormatConfig formatConfig) {
+ super(name, easyConfig(fsConf, formatConfig), context, storageConfig,
formatConfig);
}
- public PcapngFormatPlugin(String name, DrillbitContext context,
Configuration fsConf, StoragePluginConfig config, PcapngFormatConfig
formatPluginConfig) {
- super(name, context, fsConf, config, formatPluginConfig, true,
- false, false, true,
- formatPluginConfig.getExtensions(), DEFAULT_NAME);
+ private static EasyFormatConfig easyConfig(Configuration fsConf,
PcapngFormatConfig pluginConfig) {
+ EasyFormatConfig config = new EasyFormatConfig();
+ config.readable = true;
+ config.writable = false;
+ config.blockSplittable = false;
+ config.compressible = true;
+ config.extensions = pluginConfig.getExtensions();
+ config.fsConf = fsConf;
+ config.readerOperatorType = CoreOperatorType.PCAPNG_SUB_SCAN_VALUE;
+ config.useEnhancedScan = true;
+ config.supportsLimitPushdown = true;
+ config.supportsProjectPushdown = true;
+ config.defaultName = PcapngFormatConfig.NAME;
+ return config;
}
- @Override
- public boolean supportsPushDown() {
- return true;
- }
+ private static class PcapngReaderFactory extends FileReaderFactory {
- @Override
- public RecordReader getRecordReader(FragmentContext context, DrillFileSystem
dfs,
- FileWork fileWork, List<SchemaPath>
columns,
- String userName) {
- return new PcapngRecordReader(fileWork.getPath(), dfs, columns);
- }
+ private final PcapngFormatConfig config;
+ private final EasySubScan scan;
- @Override
- public RecordWriter getRecordWriter(FragmentContext context, EasyWriter
writer) {
- throw new UnsupportedOperationException("unimplemented");
- }
+ public PcapngReaderFactory(PcapngFormatConfig config, EasySubScan scan) {
+ this.config = config;
+ this.scan = scan;
+ }
- @Override
- public int getReaderOperatorType() {
- return UserBitShared.CoreOperatorType.PCAPNG_SUB_SCAN_VALUE;
- }
-
- @Override
- public int getWriterOperatorType() {
- throw new UnsupportedOperationException("unimplemented");
- }
-
- @Override
- public boolean supportsStatistics() {
- return false;
+ @Override
+ public ManagedReader<? extends FileSchemaNegotiator> newReader() {
+ return new PcapngBatchReader(config, scan);
+ }
}
@Override
- public DrillStatsTable.TableStatistics readStatistics(FileSystem fs, Path
statsTablePath) throws IOException {
- return null;
+ public ManagedReader<? extends FileSchemaNegotiator>
newBatchReader(EasySubScan scan, OptionManager options)
+ throws ExecutionSetupException {
+ return new PcapngBatchReader(formatConfig, scan);
}
@Override
- public void writeStatistics(DrillStatsTable.TableStatistics statistics,
FileSystem fs, Path statsTablePath) throws IOException {
+ protected FileScanBuilder frameworkBuilder(OptionManager options,
EasySubScan scan) throws ExecutionSetupException {
+ FileScanBuilder builder = new FileScanBuilder();
+ builder.setReaderFactory(new PcapngReaderFactory(formatConfig, scan));
+ initScanBuilder(builder, scan);
+ builder.nullType(Types.optional(MinorType.VARCHAR));
+ return builder;
}
}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
deleted file mode 100644
index 152e2e6..0000000
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/PcapngRecordReader.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng;
-
-import fr.bmartel.pcapdecoder.PcapDecoder;
-import fr.bmartel.pcapdecoder.structure.types.IPcapngType;
-import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-import org.apache.commons.io.IOUtils;
-import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.exceptions.UserException;
-import org.apache.drill.common.expression.SchemaPath;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.exception.SchemaChangeException;
-import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.ops.OperatorContext;
-import org.apache.drill.exec.physical.impl.OutputMutator;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.store.AbstractRecordReader;
-import org.apache.drill.exec.store.dfs.DrillFileSystem;
-import org.apache.drill.exec.store.pcapng.schema.Column;
-import org.apache.drill.exec.store.pcapng.schema.DummyArrayImpl;
-import org.apache.drill.exec.store.pcapng.schema.DummyImpl;
-import org.apache.drill.exec.store.pcapng.schema.Schema;
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.hadoop.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import java.util.function.BiConsumer;
-
-public class PcapngRecordReader extends AbstractRecordReader {
- private static final Logger logger =
LoggerFactory.getLogger(PcapngRecordReader.class);
-
- // batch size should not exceed max allowed record count
- private static final int BATCH_SIZE = 40_000;
-
- private final Path pathToFile;
- private OutputMutator output;
- private List<ProjectedColumnInfo> projectedCols;
- private DrillFileSystem fs;
- private InputStream in;
- private List<SchemaPath> columns;
-
- private Iterator<IPcapngType> it;
-
- public PcapngRecordReader(final Path pathToFile,
- final DrillFileSystem fileSystem,
- final List<SchemaPath> columns) {
- this.fs = fileSystem;
- this.pathToFile = fs.makeQualified(pathToFile);
- this.columns = columns;
- setColumns(columns);
- }
-
- @Override
- public void setup(final OperatorContext context, final OutputMutator output)
throws ExecutionSetupException {
- try {
-
- this.output = output;
- this.in = fs.openPossiblyCompressedStream(pathToFile);
- PcapDecoder decoder = new PcapDecoder(IOUtils.toByteArray(in));
- decoder.decode();
- this.it = decoder.getSectionList().iterator();
- setupProjection();
- } catch (IOException io) {
- throw UserException.dataReadError(io)
- .addContext("File name:", pathToFile.toUri().getPath())
- .build(logger);
- }
- }
-
- @Override
- public int next() {
- if (isSkipQuery()) {
- return iterateOverBlocks((block, counter) -> {
- });
- } else {
- return iterateOverBlocks((block, counter) ->
putToTable((IEnhancedPacketBLock) block, counter));
- }
- }
-
- private void putToTable(IEnhancedPacketBLock bLock, Integer counter) {
- for (ProjectedColumnInfo pci : projectedCols) {
- pci.getColumn().process(bLock, pci.getVv(), counter);
- }
- }
-
- @Override
- public void close() throws Exception {
- if (in != null) {
- in.close();
- in = null;
- }
- }
-
- private void setupProjection() {
- if (isSkipQuery()) {
- projectedCols = projectNone();
- } else if (isStarQuery()) {
- projectedCols = projectAllCols(Schema.getColumnsNames());
- } else {
- projectedCols = projectCols(columns);
- }
- }
-
- private List<ProjectedColumnInfo> projectNone() {
- List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
- pciBuilder.add(makeColumn("dummy", new DummyImpl()));
- return Collections.unmodifiableList(pciBuilder);
- }
-
- private List<ProjectedColumnInfo> projectAllCols(final Set<String> columns) {
- List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
- for (String colName : columns) {
- pciBuilder.add(makeColumn(colName, Schema.getColumns().get(colName)));
- }
- return Collections.unmodifiableList(pciBuilder);
- }
-
- private List<ProjectedColumnInfo> projectCols(final List<SchemaPath>
columns) {
- List<ProjectedColumnInfo> pciBuilder = new ArrayList<>();
- for (SchemaPath schemaPath : columns) {
- String projectedName = schemaPath.rootName();
- if (schemaPath.isArray()) {
- pciBuilder.add(makeColumn(projectedName, new DummyArrayImpl()));
- } else if (Schema.getColumns().containsKey(projectedName.toLowerCase()))
{
- pciBuilder.add(makeColumn(projectedName,
- Schema.getColumns().get(projectedName.toLowerCase())));
- } else {
- pciBuilder.add(makeColumn(projectedName, new DummyImpl()));
- }
- }
- return Collections.unmodifiableList(pciBuilder);
- }
-
- private ProjectedColumnInfo makeColumn(final String colName, final Column
column) {
- MaterializedField field = MaterializedField.create(colName,
column.getMinorType());
- ValueVector vector = getValueVector(field, output);
- return new ProjectedColumnInfo(vector, column, colName);
- }
-
- private ValueVector getValueVector(final MaterializedField field, final
OutputMutator output) {
- try {
- TypeProtos.MajorType majorType = field.getType();
- final Class<? extends ValueVector> clazz =
TypeHelper.getValueVectorClass(
- majorType.getMinorType(), majorType.getMode());
-
- return output.addField(field, clazz);
- } catch (SchemaChangeException sce) {
- throw UserException.internalError(sce)
- .addContext("The addition of this field is incompatible with this
OutputMutator's capabilities")
- .build(logger);
- }
- }
-
- private Integer iterateOverBlocks(BiConsumer<IPcapngType, Integer> consumer)
{
- int counter = 0;
- while (it.hasNext() && counter < BATCH_SIZE) {
- IPcapngType block = it.next();
- if (block instanceof IEnhancedPacketBLock) {
- consumer.accept(block, counter);
- counter++;
- }
- }
- return counter;
- }
-
- private static class ProjectedColumnInfo {
-
- private ValueVector vv;
- private Column colDef;
- private String columnName;
-
- ProjectedColumnInfo(ValueVector vv, Column colDef, String columnName) {
- this.vv = vv;
- this.colDef = colDef;
- this.columnName = columnName;
- }
-
- public ValueVector getVv() {
- return vv;
- }
-
- Column getColumn() {
- return colDef;
- }
-
- public String getColumnName() {
- return columnName;
- }
- }
-}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
deleted file mode 100644
index 109b7dd..0000000
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Column.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng.schema;
-
-import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.exec.vector.ValueVector;
-
-public interface Column {
- TypeProtos.MajorType getMinorType();
-
- void process(IEnhancedPacketBLock block, ValueVector vv, int count);
-}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
deleted file mode 100644
index 2023d19..0000000
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyArrayImpl.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng.schema;
-
-import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class DummyArrayImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.repeated(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
- }
-}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
deleted file mode 100644
index a8c26a0..0000000
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/DummyImpl.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng.schema;
-
-import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.vector.ValueVector;
-
-public class DummyImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count) {
- }
-}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
deleted file mode 100644
index a9738bd..0000000
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Schema.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng.schema;
-
-import fr.bmartel.pcapdecoder.structure.types.inter.IEnhancedPacketBLock;
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.store.pcapng.decoder.PacketDecoder;
-import org.apache.drill.exec.vector.ValueVector;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-import static
org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
-import static
org.apache.drill.exec.store.pcapng.schema.Util.setNullableLongColumnValue;
-
-public class Schema {
-
- private final static Map<String, Column> columns = new HashMap<>();
-
- static {
- columns.put("timestamp", new TimestampImpl());
- columns.put("packet_length", new PacketLenImpl());
- columns.put("type", new TypeImpl());
- columns.put("src_ip", new SrcIpImpl());
- columns.put("dst_ip", new DstIpImpl());
- columns.put("src_port", new SrcPortImpl());
- columns.put("dst_port", new DstPortImpl());
- columns.put("src_mac_address", new SrcMacImpl());
- columns.put("dst_mac_address", new DstMacImpl());
- columns.put("tcp_session", new TcpSessionImpl());
- columns.put("tcp_ack", new TcpAckImpl());
- columns.put("tcp_flags", new TcpFlags());
- columns.put("tcp_flags_ns", new TcpFlagsNsImpl());
- columns.put("tcp_flags_cwr", new TcpFlagsCwrImpl());
- columns.put("tcp_flags_ece", new TcpFlagsEceImpl());
- columns.put("tcp_flags_ece_ecn_capable", new TcpFlagsEceEcnCapableImpl());
- columns.put("tcp_flags_ece_congestion_experienced", new
TcpFlagsEceCongestionExperiencedImpl());
- columns.put("tcp_flags_urg", new TcpFlagsUrgIml());
- columns.put("tcp_flags_ack", new TcpFlagsAckImpl());
- columns.put("tcp_flags_psh", new TcpFlagsPshImpl());
- columns.put("tcp_flags_rst", new TcpFlagsRstImpl());
- columns.put("tcp_flags_syn", new TcpFlagsSynImpl());
- columns.put("tcp_flags_fin", new TcpFlagsFinImpl());
- columns.put("tcp_parsed_flags", new TcpParsedFlags());
- columns.put("packet_data", new PacketDataImpl());
- }
-
- public static Map<String, Column> getColumns() {
- return columns;
- }
-
- public static Set<String> getColumnsNames() {
- return columns.keySet();
- }
-
- static class TimestampImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.required(TypeProtos.MinorType.TIMESTAMP);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- Util.setTimestampColumnValue(block.getTimeStamp(), vv, count);
- }
- }
-
- static class PacketLenImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.required(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- Util.setIntegerColumnValue(block.getPacketLength(), vv, count);
- }
- }
-
- static class TypeImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.VARCHAR);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableStringColumnValue(packet.getPacketType(), vv, count);
- }
- }
- }
-
- static class SrcIpImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.VARCHAR);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableStringColumnValue(packet.getSrc_ip().getHostAddress(),
vv, count);
- }
- }
- }
-
- static class DstIpImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.VARCHAR);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableStringColumnValue(packet.getDst_ip().getHostAddress(),
vv, count);
- }
- }
- }
-
- static class SrcPortImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableIntegerColumnValue(packet.getSrc_port(), vv, count);
- }
- }
- }
-
- static class DstPortImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableIntegerColumnValue(packet.getDst_port(), vv, count);
- }
- }
- }
-
- static class SrcMacImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.VARCHAR);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
-
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableStringColumnValue(packet.getEthernetSource(), vv,
count);
- }
- }
- }
-
- static class DstMacImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.VARCHAR);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableStringColumnValue(packet.getEthernetDestination(), vv,
count);
- }
- }
- }
-
- static class TcpSessionImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.BIGINT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- setNullableLongColumnValue(packet.getSessionHash(), vv, count);
- }
- }
- }
-
- static class TcpAckImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableIntegerColumnValue(packet.getAckNumber(), vv, count);
- }
- }
- }
-
- static class TcpFlags implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableIntegerColumnValue(packet.getFlags(), vv, count);
- }
- }
- }
-
- static class TcpFlagsNsImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x100) != 0,
vv, count);
- }
- }
- }
-
- static class TcpFlagsCwrImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x80) != 0,
vv, count);
- }
- }
- }
-
- static class TcpFlagsEceImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x40) != 0,
vv, count);
- }
- }
- }
-
- static class TcpFlagsEceEcnCapableImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x42,
vv, count);
- }
- }
- }
-
- static class TcpFlagsEceCongestionExperiencedImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x42) == 0x40,
vv, count);
- }
- }
- }
-
- static class TcpFlagsUrgIml implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x20) != 0,
vv, count);
- }
- }
- }
-
- static class TcpFlagsAckImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x10) != 0,
vv, count);
- }
- }
- }
-
- static class TcpFlagsPshImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x8) != 0, vv,
count);
- }
- }
- }
-
- static class TcpFlagsRstImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x4) != 0, vv,
count);
- }
- }
- }
-
- static class TcpFlagsSynImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x2) != 0, vv,
count);
- }
- }
- }
-
- static class TcpFlagsFinImpl implements Column {
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.INT);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableBooleanColumnValue((packet.getFlags() & 0x1) != 0, vv,
count);
- }
- }
- }
-
- static class TcpParsedFlags implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.VARCHAR);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
- Util.setNullableStringColumnValue(packet.getParsedFlags(), vv, count);
- }
- }
- }
-
- static class PacketDataImpl implements Column {
- @Override
- public TypeProtos.MajorType getMinorType() {
- return Types.optional(TypeProtos.MinorType.VARCHAR);
- }
-
- @Override
- public void process(IEnhancedPacketBLock block, ValueVector vv, int count)
{
- PacketDecoder packet = new PacketDecoder();
- if (packet.readPcapng(block.getPacketData())) {
-
Util.setNullableStringColumnValue(parseBytesToASCII(block.getPacketData()), vv,
count);
- }
- }
- }
-}
diff --git
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
deleted file mode 100644
index 06e8e6a..0000000
---
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcapng/schema/Util.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng.schema;
-
-import org.apache.drill.exec.vector.IntVector;
-import org.apache.drill.exec.vector.NullableBigIntVector;
-import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
-import org.apache.drill.exec.vector.TimeStampVector;
-import org.apache.drill.exec.vector.ValueVector;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-
-public class Util {
- static void setNullableIntegerColumnValue(final int data, final ValueVector
vv, final int count) {
- ((NullableIntVector.Mutator) vv.getMutator())
- .setSafe(count, data);
- }
-
- static void setIntegerColumnValue(final int data, final ValueVector vv,
final int count) {
- ((IntVector.Mutator) vv.getMutator())
- .setSafe(count, data);
- }
-
- static void setTimestampColumnValue(final long data, final ValueVector vv,
final int count) {
- ((TimeStampVector.Mutator) vv.getMutator())
- .setSafe(count, data / 1000);
- }
-
- static void setNullableLongColumnValue(final long data, final ValueVector
vv, final int count) {
- ((NullableBigIntVector.Mutator) vv.getMutator())
- .setSafe(count, data);
- }
-
- static void setNullableStringColumnValue(final String data, final
ValueVector vv, final int count) {
- ((NullableVarCharVector.Mutator) vv.getMutator())
- .setSafe(count, data.getBytes(UTF_8), 0, data.length());
- }
-
- static void setNullableBooleanColumnValue(final boolean data, final
ValueVector vv, final int count) {
- ((NullableIntVector.Mutator) vv.getMutator())
- .setSafe(count, data ? 1 : 0);
- }
-}
\ No newline at end of file
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
index 871233b..92a91aab 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java
@@ -58,8 +58,14 @@ public class TestFormatPluginOptionExtractor extends
BaseTest {
assertEquals(d.typeName, "(type: String, autoCorrectCorruptDates:
boolean, enableStringsSignedMinMax: boolean)", d.presentParams());
break;
case "json":
+ assertEquals(d.typeName, "(type: String)", d.presentParams());
+ break;
case "sequencefile":
+ assertEquals(d.typeName, "(type: String)", d.presentParams());
+ break;
case "pcapng":
+ assertEquals(d.typeName, "(type: String, stat: boolean)",
d.presentParams());
+ break;
case "avro":
assertEquals(d.typeName, "(type: String)", d.presentParams());
break;
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
deleted file mode 100644
index 6228766..0000000
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngHeaders.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.store.pcapng;
-
-import org.apache.drill.common.types.TypeProtos;
-import org.apache.drill.common.types.Types;
-import org.apache.drill.exec.record.MaterializedField;
-import org.apache.drill.exec.record.metadata.TupleMetadata;
-import org.apache.drill.exec.record.metadata.TupleSchema;
-import org.apache.drill.test.ClusterFixture;
-import org.apache.drill.test.ClusterTest;
-import org.apache.drill.exec.physical.rowSet.RowSet;
-import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
-import org.apache.drill.test.rowSet.RowSetComparison;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.nio.file.Paths;
-
-public class TestPcapngHeaders extends ClusterTest {
- @BeforeClass
- public static void setupTestFiles() throws Exception {
- startCluster(ClusterFixture.builder(dirTestWatcher).maxParallelization(1));
- dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
- }
-
- @Test
- public void testValidHeadersForStarQuery() throws IOException {
- String query = "select * from dfs.`store/pcapng/sniff.pcapng`";
- RowSet actual = client.queryBuilder().sql(query).rowSet();
-
- TupleMetadata expectedSchema = new TupleSchema();
-
- expectedSchema.add(MaterializedField.create("tcp_flags_ece_ecn_capable",
Types.optional(TypeProtos.MinorType.INT)));
-
expectedSchema.add(MaterializedField.create("tcp_flags_ece_congestion_experienced",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("tcp_flags_psh",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("type",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("tcp_flags_cwr",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("dst_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("src_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("tcp_flags_fin",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("tcp_flags_ece",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("tcp_flags",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("tcp_flags_ack",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("src_mac_address",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("tcp_flags_syn",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("tcp_flags_rst",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("timestamp",
Types.required(TypeProtos.MinorType.TIMESTAMP)));
- expectedSchema.add(MaterializedField.create("tcp_session",
Types.optional(TypeProtos.MinorType.BIGINT)));
- expectedSchema.add(MaterializedField.create("packet_data",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("tcp_parsed_flags",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("tcp_flags_ns",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("src_port",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("packet_length",
Types.required(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("tcp_flags_urg",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("tcp_ack",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("dst_port",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("dst_mac_address",
Types.optional(TypeProtos.MinorType.VARCHAR)));
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(actual);
- }
-
- @Test
- public void testValidHeadersForProjection() throws IOException {
- String query = "select sRc_ip, dst_IP, dst_mAc_address, src_Port,
tcp_session, `Timestamp` from dfs.`store/pcapng/sniff.pcapng`";
- RowSet actual = client.queryBuilder().sql(query).rowSet();
-
- TupleMetadata expectedSchema = new TupleSchema();
-
- expectedSchema.add(MaterializedField.create("sRc_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("dst_IP",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("dst_mAc_address",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("src_Port",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("tcp_session",
Types.optional(TypeProtos.MinorType.BIGINT)));
- expectedSchema.add(MaterializedField.create("Timestamp",
Types.required(TypeProtos.MinorType.TIMESTAMP)));
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(actual);
- }
-
- @Test
- public void testValidHeadersForMissColumns() throws IOException {
- String query = "select `timestamp`, `name`, `color` from
dfs.`store/pcapng/sniff.pcapng`";
- RowSet actual = client.queryBuilder().sql(query).rowSet();
-
- TupleMetadata expectedSchema = new TupleSchema();
-
- expectedSchema.add(MaterializedField.create("timestamp",
Types.required(TypeProtos.MinorType.TIMESTAMP)));
- expectedSchema.add(MaterializedField.create("name",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("color",
Types.optional(TypeProtos.MinorType.INT)));
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(actual);
- }
-
- @Test
- public void testMixColumns() throws IOException {
- String query = "select src_ip, dst_ip, dst_mac_address, src_port,
tcp_session, `timestamp` from dfs.`store/pcapng/sniff.pcapng`";
- RowSet actual = client.queryBuilder().sql(query).rowSet();
-
- TupleMetadata expectedSchema = new TupleSchema();
-
- expectedSchema.add(MaterializedField.create("sRc_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("dst_IP",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("dst_mAc_address",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("src_Port",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("tcp_session",
Types.optional(TypeProtos.MinorType.BIGINT)));
- expectedSchema.add(MaterializedField.create("Timestamp",
Types.required(TypeProtos.MinorType.TIMESTAMP)));
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(actual);
-
- String queryWithDiffOrder = "select `timestamp`, src_ip, dst_ip, src_port,
tcp_session, dst_mac_address from dfs.`store/pcapng/sniff.pcapng`";
- actual = client.queryBuilder().sql(queryWithDiffOrder).rowSet();
-
- expectedSchema = new TupleSchema();
-
- expectedSchema.add(MaterializedField.create("timestamp",
Types.required(TypeProtos.MinorType.TIMESTAMP)));
- expectedSchema.add(MaterializedField.create("src_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("dst_ip",
Types.optional(TypeProtos.MinorType.VARCHAR)));
- expectedSchema.add(MaterializedField.create("src_port",
Types.optional(TypeProtos.MinorType.INT)));
- expectedSchema.add(MaterializedField.create("tcp_session",
Types.optional(TypeProtos.MinorType.BIGINT)));
- expectedSchema.add(MaterializedField.create("dst_mac_address",
Types.optional(TypeProtos.MinorType.VARCHAR)));
-
- expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(actual);
- }
-
- @Test
- public void testValidHeaderForArrayColumns() throws IOException {
- // query with non-existent field
- String query = "select arr[3] as arr from dfs.`store/pcapng/sniff.pcapng`";
- RowSet actual = client.queryBuilder().sql(query).rowSet();
-
- TupleMetadata expectedSchema = new TupleSchema();
-
- expectedSchema.add(MaterializedField.create("arr",
Types.optional(TypeProtos.MinorType.INT)));
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(actual);
-
- // query with an existent field which doesn't support arrays
- query = "select type[45] as arr from dfs.`store/pcapng/sniff.pcapng`";
-
- expectedSchema = new TupleSchema();
- actual = client.queryBuilder().sql(query).rowSet();
-
- expectedSchema.add(MaterializedField.create("arr",
Types.optional(TypeProtos.MinorType.INT)));
-
- expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(actual);
- }
-
- @Test
- public void testValidHeaderForNestedColumns() throws IOException {
- // query with non-existent field
- String query = "select top['nested'] as nested from
dfs.`store/pcapng/sniff.pcapng`";
- RowSet actual = client.queryBuilder().sql(query).rowSet();
-
- TupleMetadata expectedSchema = new TupleSchema();
-
- expectedSchema.add(MaterializedField.create("nested",
Types.optional(TypeProtos.MinorType.INT)));
-
- RowSet expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(actual);
-
- // query with an existent field which doesn't support nesting
- query = "select type['nested'] as nested from
dfs.`store/pcapng/sniff.pcapng`";
-
- expectedSchema = new TupleSchema();
- actual = client.queryBuilder().sql(query).rowSet();
-
- expectedSchema.add(MaterializedField.create("nested",
Types.optional(TypeProtos.MinorType.INT)));
-
- expected = new RowSetBuilder(client.allocator(), expectedSchema)
- .build();
- new RowSetComparison(expected)
- .verifyAndClearAll(actual);
- }
-}
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
index 98d7b67..d0c4efc 100644
---
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngRecordReader.java
@@ -17,84 +17,201 @@
*/
package org.apache.drill.exec.store.pcapng;
-import org.apache.drill.PlanTestBase;
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+
+import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.UserRemoteException;
-import org.junit.Assert;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.QueryTestUtil;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
-import java.nio.file.Paths;
+@Category(RowSetTests.class)
+public class TestPcapngRecordReader extends ClusterTest {
-public class TestPcapngRecordReader extends PlanTestBase {
@BeforeClass
- public static void setupTestFiles() {
+ public static void setup() throws Exception {
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
}
@Test
public void testStarQuery() throws Exception {
- Assert.assertEquals(123, testSql("select * from
dfs.`store/pcapng/sniff.pcapng`"));
- Assert.assertEquals(1, testSql("select * from
dfs.`store/pcapng/example.pcapng`"));
+ String sql = "select * from dfs.`store/pcapng/sniff.pcapng`";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ assertEquals(123, sets.rowCount());
+ sets.clear();
+ }
+
+ @Test
+ public void testExplicitQuery() throws Exception {
+ String sql = "select type, packet_length, `timestamp` from
dfs.`store/pcapng/sniff.pcapng` where type = 'ARP'";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("type", MinorType.VARCHAR)
+ .add("packet_length", MinorType.INT)
+ .add("timestamp", MinorType.TIMESTAMP)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow("ARP", 90, Instant.ofEpochMilli(1518010669927L))
+ .addRow("ARP", 90, Instant.ofEpochMilli(1518010671874L))
+ .build();
+
+ assertEquals(2, sets.rowCount());
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testLimitPushdown() throws Exception {
+ String sql = "select * from dfs.`store/pcapng/sniff.pcapng` where type =
'UDP' limit 10 offset 65";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ assertEquals(6, sets.rowCount());
+ sets.clear();
}
@Test
- public void testProjectingByName() throws Exception {
- Assert.assertEquals(123, testSql("select `timestamp`, packet_data, type
from dfs.`store/pcapng/sniff.pcapng`"));
- Assert.assertEquals(1, testSql("select src_ip, dst_ip, `timestamp` from
dfs.`store/pcapng/example.pcapng`"));
+ public void testSerDe() throws Exception {
+ String sql = "select count(*) from dfs.`store/pcapng/example.pcapng`";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+
+ assertEquals("Counts should match", 1, cnt);
}
@Test
- public void testDiffCaseQuery() throws Exception {
- Assert.assertEquals(123, testSql("select `timestamp`, paCket_dAta, TyPe
from dfs.`store/pcapng/sniff.pcapng`"));
- Assert.assertEquals(1, testSql("select src_ip, dst_ip, `Timestamp` from
dfs.`store/pcapng/example.pcapng`"));
+ public void testExplicitQueryWithCompressedFile() throws Exception {
+ QueryTestUtil.generateCompressedFile("store/pcapng/sniff.pcapng", "zip",
"store/pcapng/sniff.pcapng.zip");
+ String sql = "select type, packet_length, `timestamp` from
dfs.`store/pcapng/sniff.pcapng.zip` where type = 'ARP'";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("type", MinorType.VARCHAR)
+ .add("packet_length", MinorType.INT)
+ .add("timestamp", MinorType.TIMESTAMP)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow("ARP", 90, Instant.ofEpochMilli(1518010669927L))
+ .addRow("ARP", 90, Instant.ofEpochMilli(1518010671874L))
+ .build();
+
+ assertEquals(2, sets.rowCount());
+ new RowSetComparison(expected).verifyAndClearAll(sets);
}
@Test
- public void testProjectingMissColls() throws Exception {
- Assert.assertEquals(123, testSql("select `timestamp`, `name`, `color` from
dfs.`store/pcapng/sniff.pcapng`"));
- Assert.assertEquals(1, testSql("select src_ip, `time` from
dfs.`store/pcapng/example.pcapng`"));
+ public void testCaseInsensitiveQuery() throws Exception {
+ String sql = "select `timestamp`, paCket_dAta, TyPe from
dfs.`store/pcapng/sniff.pcapng`";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ assertEquals(123, sets.rowCount());
+ sets.clear();
}
+ @Test
+ public void testWhereSyntaxQuery() throws Exception {
+ String sql = "select type, src_ip, dst_ip, packet_length from
dfs.`store/pcapng/sniff.pcapng` where src_ip= '10.2.15.239'";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("type", MinorType.VARCHAR)
+ .addNullable("src_ip", MinorType.VARCHAR)
+ .addNullable("dst_ip", MinorType.VARCHAR)
+ .add("packet_length", MinorType.INT)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow("UDP", "10.2.15.239", "239.255.255.250", 214)
+ .addRow("UDP", "10.2.15.239", "239.255.255.250", 214)
+ .addRow("UDP", "10.2.15.239", "239.255.255.250", 214)
+ .build();
+
+ assertEquals(3, sets.rowCount());
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
@Test
- public void testCountQuery() throws Exception {
- testBuilder()
- .sqlQuery("select count(*) as ct from dfs.`store/pcapng/sniff.pcapng`")
- .ordered()
- .baselineColumns("ct")
- .baselineValues(123L)
- .build()
- .run();
-
- testBuilder()
- .sqlQuery("select count(*) as ct from
dfs.`store/pcapng/example.pcapng`")
- .ordered()
- .baselineColumns("ct")
- .baselineValues(1L)
- .build()
- .run();
+ public void testValidHeaders() throws Exception {
+ String sql = "select * from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet sets = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .add("timestamp", MinorType.TIMESTAMP)
+ .add("packet_length", MinorType.INT)
+ .addNullable("type", MinorType.VARCHAR)
+ .addNullable("src_ip", MinorType.VARCHAR)
+ .addNullable("dst_ip", MinorType.VARCHAR)
+ .addNullable("src_port", MinorType.INT)
+ .addNullable("dst_port", MinorType.INT)
+ .addNullable("src_mac_address", MinorType.VARCHAR)
+ .addNullable("dst_mac_address", MinorType.VARCHAR)
+ .addNullable("tcp_session", MinorType.BIGINT)
+ .addNullable("tcp_ack", MinorType.INT)
+ .addNullable("tcp_flags", MinorType.INT)
+ .addNullable("tcp_flags_ns", MinorType.INT)
+ .addNullable("tcp_flags_cwr", MinorType.INT)
+ .addNullable("tcp_flags_ece", MinorType.INT)
+ .addNullable("tcp_flags_ece_ecn_capable", MinorType.INT)
+ .addNullable("tcp_flags_ece_congestion_experienced", MinorType.INT)
+ .addNullable("tcp_flags_urg", MinorType.INT)
+ .addNullable("tcp_flags_ack", MinorType.INT)
+ .addNullable("tcp_flags_psh", MinorType.INT)
+ .addNullable("tcp_flags_rst", MinorType.INT)
+ .addNullable("tcp_flags_syn", MinorType.INT)
+ .addNullable("tcp_flags_fin", MinorType.INT)
+ .addNullable("tcp_parsed_flags", MinorType.VARCHAR)
+ .addNullable("packet_data", MinorType.VARCHAR)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema).build();
+ new RowSetComparison(expected).verifyAndClearAll(sets);
}
@Test
public void testGroupBy() throws Exception {
- Assert.assertEquals(47, testSql("select src_ip, count(1),
sum(packet_length) from dfs.`store/pcapng/sniff.pcapng` group by src_ip"));
+ String sql = "select src_ip, count(1), sum(packet_length) from
dfs.`store/pcapng/sniff.pcapng` group by src_ip";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ assertEquals(47, sets.rowCount());
+ sets.clear();
}
@Test
public void testDistinctQuery() throws Exception {
- Assert.assertEquals(119, testSql("select distinct `timestamp`, src_ip from
dfs.`store/pcapng/sniff.pcapng`"));
- Assert.assertEquals(1, testSql("select distinct packet_data from
dfs.`store/pcapng/example.pcapng`"));
+ String sql = "select distinct `timestamp`, src_ip from
dfs.`store/pcapng/sniff.pcapng`";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ assertEquals(119, sets.rowCount());
+ sets.clear();
}
@Test(expected = UserRemoteException.class)
public void testBasicQueryWithIncorrectFileName() throws Exception {
- testSql("select * from dfs.`store/pcapng/snaff.pcapng`");
- }
-
- @Test
- public void testPhysicalPlanExecutionBasedOnQuery() throws Exception {
- String query = "EXPLAIN PLAN for select * from
dfs.`store/pcapng/sniff.pcapng`";
- String plan = getPlanInString(query, JSON_FORMAT);
- Assert.assertEquals(123, testPhysical(plan));
+ String sql = "select * from dfs.`store/pcapng/drill.pcapng`";
+ client.queryBuilder().sql(sql).rowSet();
}
-}
+}
\ No newline at end of file
diff --git
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
new file mode 100644
index 0000000..2dbdda7
--- /dev/null
+++
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcapng/TestPcapngStatRecordReader.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.pcapng;
+
+import static org.junit.Assert.assertEquals;
+
+import java.nio.file.Paths;
+
+import org.apache.drill.categories.RowSetTests;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.physical.rowSet.RowSetBuilder;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.QueryBuilder;
+import org.apache.drill.test.rowSet.RowSetComparison;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(RowSetTests.class)
+public class TestPcapngStatRecordReader extends ClusterTest {
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher));
+ cluster.defineFormat("dfs", "pcapng", new PcapngFormatConfig(null, true));
+ dirTestWatcher.copyResourceToRoot(Paths.get("store", "pcapng"));
+ }
+
+ @Test
+ public void testStarQuery() throws Exception {
+ String sql = "select * from dfs.`store/pcapng/example.pcapng`";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ assertEquals(3, sets.rowCount());
+ sets.clear();
+ }
+
+ @Test
+ public void testExplicitQuery() throws Exception {
+ String sql = "select path, shb_hardware, shb_os, if_name, isb_ifrecv from
dfs.`store/pcapng/sniff.pcapng`";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("path", MinorType.VARCHAR)
+ .addNullable("shb_hardware", MinorType.VARCHAR)
+ .addNullable("shb_os", MinorType.VARCHAR)
+ .addNullable("if_name", MinorType.VARCHAR)
+ .addNullable("isb_ifrecv", MinorType.BIGINT)
+ .buildSchema();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema)
+ .addRow("sniff.pcapng", "Intel(R) Core(TM) i7-6700HQ CPU @ 2.60GHz
(with SSE4.2)",
+ "Mac OS X 10.13.3, build 17D47 (Darwin 17.4.0)", null, null)
+ .addRow("sniff.pcapng", null, null, "en0", null)
+ .addRow("sniff.pcapng", null, null, null, 123)
+ .build();
+
+ assertEquals(3, sets.rowCount());
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+
+ @Test
+ public void testLimitPushdown() throws Exception {
+ String sql = "select * from dfs.`store/pcapng/example.pcapng` limit 2";
+ QueryBuilder builder = client.queryBuilder().sql(sql);
+ RowSet sets = builder.rowSet();
+
+ assertEquals(2, sets.rowCount());
+ sets.clear();
+ }
+
+ @Test
+ public void testSerDe() throws Exception {
+ String sql = "select count(*) from dfs.`store/pcapng/*.pcapng`";
+ String plan = queryBuilder().sql(sql).explainJson();
+ long cnt = queryBuilder().physical(plan).singletonLong();
+
+ assertEquals("Counts should match", 6, cnt);
+ }
+
+ @Test
+ public void testValidHeaders() throws Exception {
+ String sql = "select * from dfs.`store/pcapng/sniff.pcapng`";
+ RowSet sets = client.queryBuilder().sql(sql).rowSet();
+
+ TupleMetadata schema = new SchemaBuilder()
+ .addNullable("path", MinorType.VARCHAR)
+ .addNullable("shb_hardware", MinorType.VARCHAR)
+ .addNullable("shb_os", MinorType.VARCHAR)
+ .addNullable("shb_userappl", MinorType.VARCHAR)
+ .addNullable("if_name", MinorType.VARCHAR)
+ .addNullable("if_description", MinorType.VARCHAR)
+ .addNullable("if_ipv4addr", MinorType.VARCHAR)
+ .addNullable("if_ipv6addr", MinorType.VARCHAR)
+ .addNullable("if_macaddr", MinorType.VARCHAR)
+ .addNullable("if_euiaddr", MinorType.VARCHAR)
+ .addNullable("if_speed", MinorType.INT)
+ .addNullable("if_tsresol", MinorType.INT)
+ .addNullable("if_tzone", MinorType.INT)
+ .addNullable("if_os", MinorType.VARCHAR)
+ .addNullable("if_fcslen", MinorType.INT)
+ .addNullable("if_tsoffset", MinorType.INT)
+ .addNullable("ns_dnsname", MinorType.VARCHAR)
+ .addNullable("ns_dnsip4addr", MinorType.VARCHAR)
+ .addNullable("ns_dnsip6addr", MinorType.VARCHAR)
+ .addNullable("isb_starttime", MinorType.TIMESTAMP)
+ .addNullable("isb_endtime", MinorType.TIMESTAMP)
+ .addNullable("isb_ifrecv", MinorType.BIGINT)
+ .addNullable("isb_ifdrop", MinorType.BIGINT)
+ .addNullable("isb_filteraccept", MinorType.BIGINT)
+ .addNullable("isb_osdrop", MinorType.BIGINT)
+ .addNullable("isb_usrdeliv", MinorType.BIGINT)
+ .build();
+
+ RowSet expected = new RowSetBuilder(client.allocator(), schema).build();
+ new RowSetComparison(expected).verifyAndClearAll(sets);
+ }
+}
\ No newline at end of file