DRILL-6190: Fix handling of packets longer than legally allowed closes #1133
Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/57e5ab26 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/57e5ab26 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/57e5ab26 Branch: refs/heads/master Commit: 57e5ab26659e1fa393f8963435de946e0da949be Parents: 3bc4e31 Author: Ted Dunning <[email protected]> Authored: Wed Jan 10 16:52:53 2018 -0800 Committer: Arina Ielchiieva <[email protected]> Committed: Sat Mar 3 19:47:40 2018 +0200 ---------------------------------------------------------------------- .../drill/exec/store/pcap/PcapRecordReader.java | 43 +++++++++++++------- .../store/pcap/decoder/PacketConstants.java | 2 +- .../exec/store/pcap/decoder/PacketDecoder.java | 15 ++++++- .../drill/exec/store/pcap/TestPcapDecoder.java | 13 +++--- .../exec/store/pcap/TestPcapRecordReader.java | 5 ++- 5 files changed, 52 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/57e5ab26/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java index a20e1de..26e1e65 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java @@ -56,8 +56,9 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII; public class PcapRecordReader extends AbstractRecordReader { - private static final Logger logger = LoggerFactory.getLogger(PcapRecordReader.class); + static final int BUFFER_SIZE = 500_000; // this should be relatively large relative to max packet + private static final Logger logger = LoggerFactory.getLogger(PcapRecordReader.class); private static final int BATCH_SIZE = 40_000; private OutputMutator output; @@ -101,11 +102,10 @@ public class PcapRecordReader extends AbstractRecordReader { @Override public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { try { - this.output = output; - this.buffer = new byte[100000]; this.in = fs.open(pathToFile); this.decoder = new PacketDecoder(in); + this.buffer = new byte[BUFFER_SIZE + decoder.getMaxLength()]; this.validBytes = in.read(buffer); this.projectedCols = getProjectedColsIfItNull(); setColumns(projectedColumns); @@ -197,20 +197,33 @@ public class PcapRecordReader extends AbstractRecordReader { private int parsePcapFilesAndPutItToTable() throws IOException { Packet packet = new Packet(); int counter = 0; - while (offset < validBytes && counter != BATCH_SIZE) { - - if (validBytes - offset < 9000) { - System.arraycopy(buffer, offset, buffer, 0, validBytes - offset); - validBytes = validBytes - offset; - offset = 0; - - int n = in.read(buffer, validBytes, buffer.length - validBytes); - if (n > 0) { - validBytes += n; + while (offset < validBytes && counter < BATCH_SIZE) { + if (validBytes - offset < decoder.getMaxLength()) { + if (validBytes == buffer.length) { + // shift data and read more. This is the common case. + System.arraycopy(buffer, offset, buffer, 0, validBytes - offset); + validBytes = validBytes - offset; + offset = 0; + + int n = in.read(buffer, validBytes, buffer.length - validBytes); + if (n > 0) { + validBytes += n; + } + logger.info("read {} bytes, at {} offset", n, validBytes); + } else { + // near the end of the file, we will just top up the buffer without shifting + int n = in.read(buffer, offset, buffer.length - offset); + if (n > 0) { + validBytes = validBytes + n; + logger.info("Topped up buffer with {} bytes to yield {}\n", n, validBytes); + } } } - - offset = decoder.decodePacket(buffer, offset, packet); + int old = offset; + offset = decoder.decodePacket(buffer, offset, packet, decoder.getMaxLength(), validBytes); + if (offset > validBytes) { + logger.error("Invalid packet at offset {}", old); + } if (addDataToTable(packet, decoder.getNetwork(), counter)) { counter++; http://git-wip-us.apache.org/repos/asf/drill/blob/57e5ab26/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java index 003f87e..2c87623 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketConstants.java @@ -20,7 +20,7 @@ package org.apache.drill.exec.store.pcap.decoder; @SuppressWarnings("WeakerAccess") public final class PacketConstants { - public static final int PCAP_HEADER_SIZE = 4 * 4; + public static final int PCAP_HEADER_SIZE = 4 * 4; // packet header, not file header public static final int TIMESTAMP_OFFSET = 0; public static final int TIMESTAMP_MICRO_OFFSET = 4; http://git-wip-us.apache.org/repos/asf/drill/blob/57e5ab26/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java index 704c3fd..7460aaa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/PacketDecoder.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.store.pcap.decoder; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; @@ -58,6 +60,7 @@ public class PacketDecoder { private static final int PCAP_MAGIC_LITTLE_ENDIAN = 0xD4C3B2A1; private static final int PCAP_MAGIC_NUMBER = 0xA1B2C3D4; + private static final Logger logger = LoggerFactory.getLogger(PacketDecoder.class); private final int maxLength; private final int network; @@ -89,8 +92,16 @@ public class PacketDecoder { network = getIntFileOrder(bigEndian, globalHeader, 20); } - public int decodePacket(final byte[] buffer, final int offset, Packet p) { - return p.decodePcap(buffer, offset, bigEndian, maxLength); + public final int getMaxLength() { + return maxLength; + } + + public int decodePacket(final byte[] buffer, final int offset, Packet p, int maxPacket, int validBytes) { + int r = p.decodePcap(buffer, offset, bigEndian, Math.min(maxPacket, validBytes - offset)); + if (r > validBytes) { + logger.error("Invalid packet at offset {}", offset); + } + return r; } public Packet packet() { http://git-wip-us.apache.org/repos/asf/drill/blob/57e5ab26/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java index 6e58ccf..c8cedc1 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java @@ -58,12 +58,12 @@ public class TestPcapDecoder extends BaseTestQuery { int offset = 0; - byte[] buffer = new byte[100000]; + byte[] buffer = new byte[PcapRecordReader.BUFFER_SIZE + pd.getMaxLength()]; int validBytes = in.read(buffer); assertTrue(validBytes > 50); - offset = pd.decodePacket(buffer, offset, p); - offset = pd.decodePacket(buffer, offset, p); + offset = pd.decodePacket(buffer, offset, p, pd.getMaxLength(), validBytes); + offset = pd.decodePacket(buffer, offset, p, pd.getMaxLength(), validBytes); assertEquals(228, offset); assertEquals("FE:00:00:00:00:02", p.getEthernetDestination()); @@ -104,6 +104,7 @@ public class TestPcapDecoder extends BaseTestQuery { // the code from here down is useful in that it tests the assumptions that // the entire package is based on, but it doesn't really define tests. // As such, it can be run as a main class, but isn't supported as unit tests. + /** * This tests the speed when creating an actual object for each packet. * <p> @@ -163,7 +164,7 @@ public class TestPcapDecoder extends BaseTestQuery { PacketDecoder pd = new PacketDecoder(in); Packet p = pd.packet(); - byte[] buffer = new byte[100000]; + byte[] buffer = new byte[PcapRecordReader.BUFFER_SIZE + pd.getMaxLength()]; int validBytes = in.read(buffer); int offset = 0; @@ -176,7 +177,7 @@ public class TestPcapDecoder extends BaseTestQuery { // get new data and shift current data to beginning of buffer if there is any danger // of straddling the buffer end in the next packet // even with jumbo packets this should be enough space to guarantee parsing - if (validBytes - offset < 9000) { + if (validBytes - offset < pd.getMaxLength()) { System.arraycopy(buffer, 0, buffer, offset, validBytes - offset); validBytes = validBytes - offset; offset = 0; @@ -188,7 +189,7 @@ public class TestPcapDecoder extends BaseTestQuery { } // decode the packet as it lies - offset = pd.decodePacket(buffer, offset, p); + offset = pd.decodePacket(buffer, offset, p, pd.getMaxLength(), validBytes); total += p.getPacketLength(); allCount++; if (p.isTcpPacket()) { http://git-wip-us.apache.org/repos/asf/drill/blob/57e5ab26/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java index 706694d..bb81469 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java @@ -48,7 +48,8 @@ public class TestPcapRecordReader extends BaseTestQuery { @Test public void testDistinctQuery() throws Exception { - runSQLVerifyCount("select distinct * from dfs.`store/pcap/tcp-1.pcap`", 1); + // omit data field from distinct count for now + runSQLVerifyCount("select distinct type, network, `timestamp`, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, packet_length from dfs.`store/pcap/tcp-1.pcap`", 1); } private void runSQLVerifyCount(String sql, int expectedRowCount) throws Exception { @@ -62,7 +63,7 @@ public class TestPcapRecordReader extends BaseTestQuery { private void printResultAndVerifyRowCount(List<QueryDataBatch> results, int expectedRowCount) throws SchemaChangeException { - setColumnWidth(25); + setColumnWidth(35); int rowCount = printResult(results); if (expectedRowCount != -1) { Assert.assertEquals(expectedRowCount, rowCount);
