DRILL-6190: Fix handling of packets longer than legally allowed

closes #1133


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/57e5ab26
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/57e5ab26
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/57e5ab26

Branch: refs/heads/master
Commit: 57e5ab26659e1fa393f8963435de946e0da949be
Parents: 3bc4e31
Author: Ted Dunning <[email protected]>
Authored: Wed Jan 10 16:52:53 2018 -0800
Committer: Arina Ielchiieva <[email protected]>
Committed: Sat Mar 3 19:47:40 2018 +0200

----------------------------------------------------------------------
 .../drill/exec/store/pcap/PcapRecordReader.java | 43 +++++++++++++-------
 .../store/pcap/decoder/PacketConstants.java     |  2 +-
 .../exec/store/pcap/decoder/PacketDecoder.java  | 15 ++++++-
 .../drill/exec/store/pcap/TestPcapDecoder.java  | 13 +++---
 .../exec/store/pcap/TestPcapRecordReader.java   |  5 ++-
 5 files changed, 52 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/57e5ab26/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
index a20e1de..26e1e65 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -56,8 +56,9 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static 
org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII;
 
 public class PcapRecordReader extends AbstractRecordReader {
-  private static final Logger logger = 
LoggerFactory.getLogger(PcapRecordReader.class);
+  static final int BUFFER_SIZE = 500_000;  // this should be relatively large 
relative to max packet
 
+  private static final Logger logger = 
LoggerFactory.getLogger(PcapRecordReader.class);
   private static final int BATCH_SIZE = 40_000;
 
   private OutputMutator output;
@@ -101,11 +102,10 @@ public class PcapRecordReader extends 
AbstractRecordReader {
   @Override
   public void setup(final OperatorContext context, final OutputMutator output) 
throws ExecutionSetupException {
     try {
-
       this.output = output;
-      this.buffer = new byte[100000];
       this.in = fs.open(pathToFile);
       this.decoder = new PacketDecoder(in);
+      this.buffer = new byte[BUFFER_SIZE + decoder.getMaxLength()];
       this.validBytes = in.read(buffer);
       this.projectedCols = getProjectedColsIfItNull();
       setColumns(projectedColumns);
@@ -197,20 +197,33 @@ public class PcapRecordReader extends 
AbstractRecordReader {
   private int parsePcapFilesAndPutItToTable() throws IOException {
     Packet packet = new Packet();
     int counter = 0;
-    while (offset < validBytes && counter != BATCH_SIZE) {
-
-      if (validBytes - offset < 9000) {
-        System.arraycopy(buffer, offset, buffer, 0, validBytes - offset);
-        validBytes = validBytes - offset;
-        offset = 0;
-
-        int n = in.read(buffer, validBytes, buffer.length - validBytes);
-        if (n > 0) {
-          validBytes += n;
+    while (offset < validBytes && counter < BATCH_SIZE) {
+      if (validBytes - offset < decoder.getMaxLength()) {
+        if (validBytes == buffer.length) {
+          // shift data and read more. This is the common case.
+          System.arraycopy(buffer, offset, buffer, 0, validBytes - offset);
+          validBytes = validBytes - offset;
+          offset = 0;
+
+          int n = in.read(buffer, validBytes, buffer.length - validBytes);
+          if (n > 0) {
+            validBytes += n;
+          }
+          logger.info("read {} bytes, at {} offset", n, validBytes);
+        } else {
+          // near the end of the file, we will just top up the buffer without 
shifting
+          int n = in.read(buffer, offset, buffer.length - offset);
+          if (n > 0) {
+            validBytes = validBytes + n;
+            logger.info("Topped up buffer with {} bytes to yield {}\n", n, 
validBytes);
+          }
         }
       }
-
-      offset = decoder.decodePacket(buffer, offset, packet);
+      int old = offset;
+      offset = decoder.decodePacket(buffer, offset, packet, 
decoder.getMaxLength(), validBytes);
+      if (offset > validBytes) {
+        logger.error("Invalid packet at offset {}", old);
+      }
 
       if (addDataToTable(packet, decoder.getNetwork(), counter)) {
         counter++;

http://git-wip-us.apache.org/repos/asf/drill/blob/57e5ab26/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
index 003f87e..2c87623 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java
@@ -20,7 +20,7 @@ package org.apache.drill.exec.store.pcap.decoder;
 @SuppressWarnings("WeakerAccess")
 public final class PacketConstants {
 
-  public static final int PCAP_HEADER_SIZE = 4 * 4;
+  public static final int PCAP_HEADER_SIZE = 4 * 4;   // packet header, not 
file header
 
   public static final int TIMESTAMP_OFFSET = 0;
   public static final int TIMESTAMP_MICRO_OFFSET = 4;

http://git-wip-us.apache.org/repos/asf/drill/blob/57e5ab26/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
index 704c3fd..7460aaa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.pcap.decoder;
 
 import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -58,6 +60,7 @@ public class PacketDecoder {
   private static final int PCAP_MAGIC_LITTLE_ENDIAN = 0xD4C3B2A1;
   private static final int PCAP_MAGIC_NUMBER = 0xA1B2C3D4;
 
+  private static final Logger logger = 
LoggerFactory.getLogger(PacketDecoder.class);
 
   private final int maxLength;
   private final int network;
@@ -89,8 +92,16 @@ public class PacketDecoder {
     network = getIntFileOrder(bigEndian, globalHeader, 20);
   }
 
-  public int decodePacket(final byte[] buffer, final int offset, Packet p) {
-    return p.decodePcap(buffer, offset, bigEndian, maxLength);
+  public final int getMaxLength() {
+    return maxLength;
+  }
+
+  public int decodePacket(final byte[] buffer, final int offset, Packet p, int 
maxPacket, int validBytes) {
+    int r = p.decodePcap(buffer, offset, bigEndian, Math.min(maxPacket, 
validBytes - offset));
+    if (r > validBytes) {
+      logger.error("Invalid packet at offset {}", offset);
+    }
+    return r;
   }
 
   public Packet packet() {

http://git-wip-us.apache.org/repos/asf/drill/blob/57e5ab26/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
index 6e58ccf..c8cedc1 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java
@@ -58,12 +58,12 @@ public class TestPcapDecoder extends BaseTestQuery {
     int offset = 0;
 
 
-    byte[] buffer = new byte[100000];
+    byte[] buffer = new byte[PcapRecordReader.BUFFER_SIZE + pd.getMaxLength()];
     int validBytes = in.read(buffer);
     assertTrue(validBytes > 50);
 
-    offset = pd.decodePacket(buffer, offset, p);
-    offset = pd.decodePacket(buffer, offset, p);
+    offset = pd.decodePacket(buffer, offset, p, pd.getMaxLength(), validBytes);
+    offset = pd.decodePacket(buffer, offset, p, pd.getMaxLength(), validBytes);
     assertEquals(228, offset);
 
     assertEquals("FE:00:00:00:00:02", p.getEthernetDestination());
@@ -104,6 +104,7 @@ public class TestPcapDecoder extends BaseTestQuery {
   // the code from here down is useful in that it tests the assumptions that
   // the entire package is based on, but it doesn't really define tests.
   // As such, it can be run as a main class, but isn't supported as unit tests.
+
   /**
    * This tests the speed when creating an actual object for each packet.
    * <p>
@@ -163,7 +164,7 @@ public class TestPcapDecoder extends BaseTestQuery {
     PacketDecoder pd = new PacketDecoder(in);
     Packet p = pd.packet();
 
-    byte[] buffer = new byte[100000];
+    byte[] buffer = new byte[PcapRecordReader.BUFFER_SIZE + pd.getMaxLength()];
     int validBytes = in.read(buffer);
 
     int offset = 0;
@@ -176,7 +177,7 @@ public class TestPcapDecoder extends BaseTestQuery {
       // get new data and shift current data to beginning of buffer if there 
is any danger
       // of straddling the buffer end in the next packet
       // even with jumbo packets this should be enough space to guarantee 
parsing
-      if (validBytes - offset < 9000) {
+      if (validBytes - offset < pd.getMaxLength()) {
         System.arraycopy(buffer, 0, buffer, offset, validBytes - offset);
         validBytes = validBytes - offset;
         offset = 0;
@@ -188,7 +189,7 @@ public class TestPcapDecoder extends BaseTestQuery {
       }
 
       // decode the packet as it lies
-      offset = pd.decodePacket(buffer, offset, p);
+      offset = pd.decodePacket(buffer, offset, p, pd.getMaxLength(), 
validBytes);
       total += p.getPacketLength();
       allCount++;
       if (p.isTcpPacket()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/57e5ab26/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
index 706694d..bb81469 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
@@ -48,7 +48,8 @@ public class TestPcapRecordReader extends BaseTestQuery {
 
   @Test
   public void testDistinctQuery() throws Exception {
-    runSQLVerifyCount("select distinct * from dfs.`store/pcap/tcp-1.pcap`", 1);
+    // omit data field from distinct count for now
+    runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, 
dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, 
packet_length from dfs.`store/pcap/tcp-1.pcap`", 1);
   }
 
   private void runSQLVerifyCount(String sql, int expectedRowCount) throws 
Exception {
@@ -62,7 +63,7 @@ public class TestPcapRecordReader extends BaseTestQuery {
 
   private void printResultAndVerifyRowCount(List<QueryDataBatch> results,
                                             int expectedRowCount) throws 
SchemaChangeException {
-    setColumnWidth(25);
+    setColumnWidth(35);
     int rowCount = printResult(results);
     if (expectedRowCount != -1) {
       Assert.assertEquals(expectedRowCount, rowCount);

Reply via email to