This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit 12487c9b34cf6211d9f954801ae63efc4e43280d Author: Charles S. Givre <[email protected]> AuthorDate: Mon Mar 25 11:40:29 2019 -0400 DRILL-7032: Ignore corrupt rows in a PCAP file closes #1637 --- .../drill/exec/store/pcap/PcapRecordReader.java | 51 +++++++++++++-------- .../drill/exec/store/pcap/decoder/Packet.java | 19 +++++++- .../drill/exec/store/pcap/schema/Schema.java | 25 +++++----- .../exec/store/pcap/TestPcapRecordReader.java | 7 +++ .../src/test/resources/store/pcap/testv1.pcap | Bin 0 -> 544736 bytes 5 files changed, 68 insertions(+), 34 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java index 08f501f..f9b8a72 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.pcap; +import org.apache.drill.exec.vector.NullableBitVector; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; import org.apache.drill.common.exceptions.ExecutionSetupException; @@ -85,11 +86,12 @@ public class PcapRecordReader extends AbstractRecordReader { static { TYPES = ImmutableMap.<PcapTypes, TypeProtos.MinorType>builder() - .put(PcapTypes.STRING, MinorType.VARCHAR) - .put(PcapTypes.INTEGER, MinorType.INT) - .put(PcapTypes.LONG, MinorType.BIGINT) - .put(PcapTypes.TIMESTAMP, MinorType.TIMESTAMP) - .build(); + .put(PcapTypes.STRING, MinorType.VARCHAR) + .put(PcapTypes.INTEGER, MinorType.INT) + .put(PcapTypes.LONG, MinorType.BIGINT) + .put(PcapTypes.TIMESTAMP, MinorType.TIMESTAMP) + .put(PcapTypes.BOOLEAN, MinorType.BIT) + .build(); } public PcapRecordReader(final Path pathToFile, @@ -112,8 +114,8 @@ public class PcapRecordReader extends AbstractRecordReader { setColumns(projectedColumns); } catch (IOException io) { throw UserException.dataReadError(io) - .addContext("File name:", pathToFile.toUri().getPath()) - .build(logger); + .addContext("File name:", pathToFile.toUri().getPath()) + .build(logger); } } @@ -123,8 +125,8 @@ public class PcapRecordReader extends AbstractRecordReader { return parsePcapFilesAndPutItToTable(); } catch (IOException io) { throw UserException.dataReadError(io) - .addContext("Trouble with reading packets in file!") - .build(logger); + .addContext("Trouble with reading packets in file!") + .build(logger); } } @@ -160,10 +162,10 @@ public class PcapRecordReader extends AbstractRecordReader { TypeProtos.MajorType majorType = getMajorType(minorType); MaterializedField field = - MaterializedField.create(name, majorType); + MaterializedField.create(name, majorType); ValueVector vector = - getValueVector(minorType, majorType, field); + getValueVector(minorType, majorType, field); return getProjectedColumnInfo(column, vector); } @@ -185,7 +187,7 @@ public class PcapRecordReader extends AbstractRecordReader { try { final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass( - minorType, majorType.getMode()); + minorType, majorType.getMode()); ValueVector vector = output.addField(field, clazz); vector.allocateNew(); return vector; @@ -223,6 +225,7 @@ public class PcapRecordReader extends AbstractRecordReader { int old = offset; offset = decoder.decodePacket(buffer, offset, packet, decoder.getMaxLength(), validBytes); if (offset > validBytes) { + //Start here... logger.error("Invalid packet at offset {}", old); } @@ -286,7 +289,7 @@ public class PcapRecordReader extends AbstractRecordReader { break; case "tcp_ack": if (packet.isTcpPacket()) { - setIntegerColumnValue(packet.getAckNumber(), pci, count); + setBooleanColumnValue(packet.getAckNumber(), pci, count); } break; case "tcp_flags": @@ -357,6 +360,9 @@ public class PcapRecordReader extends AbstractRecordReader { case "packet_length": setIntegerColumnValue(packet.getPacketLength(), pci, count); break; + case "is_corrupt": + setBooleanColumnValue(packet.isCorrupt(), pci, count); + break; case "data": if (packet.getData() != null) { setStringColumnValue(parseBytesToASCII(packet.getData()), pci, count); @@ -371,32 +377,37 @@ public class PcapRecordReader extends AbstractRecordReader { private void setLongColumnValue(long data, ProjectedColumnInfo pci, final int count) { ((NullableBigIntVector.Mutator) pci.vv.getMutator()) - .setSafe(count, data); + .setSafe(count, data); } private void setIntegerColumnValue(final int data, final ProjectedColumnInfo pci, final int count) { ((NullableIntVector.Mutator) pci.vv.getMutator()) - .setSafe(count, data); + .setSafe(count, data); } private void setBooleanColumnValue(final boolean data, final ProjectedColumnInfo pci, final int count) { - ((NullableIntVector.Mutator) pci.vv.getMutator()) - .setSafe(count, data ? 1 : 0); + ((NullableBitVector.Mutator) pci.vv.getMutator()) + .setSafe(count, data ? 1 : 0); + } + + private void setBooleanColumnValue(final int data, final ProjectedColumnInfo pci, final int count) { + ((NullableBitVector.Mutator) pci.vv.getMutator()) + .setSafe(count, data); } private void setTimestampColumnValue(final long data, final ProjectedColumnInfo pci, final int count) { ((NullableTimeStampVector.Mutator) pci.vv.getMutator()) - .setSafe(count, data); + .setSafe(count, data); } private void setStringColumnValue(final String data, final ProjectedColumnInfo pci, final int count) { if (data == null) { ((NullableVarCharVector.Mutator) pci.vv.getMutator()) - .setNull(count); + .setNull(count); } else { ByteBuffer value = ByteBuffer.wrap(data.getBytes(UTF_8)); ((NullableVarCharVector.Mutator) pci.vv.getMutator()) - .setSafe(count, value, 0, value.remaining()); + .setSafe(count, value, 0, value.remaining()); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java index 8e6a81b..5afe7f0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java @@ -18,6 +18,8 @@ package org.apache.drill.exec.store.pcap.decoder; import org.apache.drill.shaded.guava.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; @@ -53,8 +55,11 @@ public class Packet { private int packetLength; protected int etherProtocol; protected int protocol; - protected boolean isRoutingV6; + protected boolean isCorrupt = false; + + private static final Logger logger = LoggerFactory.getLogger(Packet.class); + @SuppressWarnings("WeakerAccess") public boolean readPcap(final InputStream in, final boolean byteOrder, final int maxLength) throws IOException { @@ -312,6 +317,10 @@ public class Packet { return getPort(2); } + public boolean isCorrupt(){ + return isCorrupt; + } + public byte[] getData() { int payloadDataStart = getIPHeaderLength(); if (isTcpPacket()) { @@ -324,7 +333,13 @@ public class Packet { byte[] data = null; if (packetLength >= payloadDataStart) { data = new byte[packetLength - payloadDataStart]; - System.arraycopy(raw, ipOffset + payloadDataStart, data, 0, data.length); + try { + System.arraycopy(raw, ipOffset + payloadDataStart, data, 0, data.length); + } catch (Exception e) { + isCorrupt = true; + String message = "Error while parsing PCAP data: {}"; + logger.debug(message, e.getMessage()); + logger.trace(message, e); } } return data; } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java index 07ecd4b..e6a2f5d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java @@ -43,21 +43,22 @@ public class Schema { columns.add(new ColumnDto("src_mac_address", PcapTypes.STRING)); columns.add(new ColumnDto("dst_mac_address", PcapTypes.STRING)); columns.add(new ColumnDto("tcp_session", PcapTypes.LONG)); - columns.add(new ColumnDto("tcp_ack", PcapTypes.INTEGER)); + columns.add(new ColumnDto("tcp_ack", PcapTypes.BOOLEAN)); columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER)); - columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.INTEGER)); - columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.INTEGER)); - columns.add(new ColumnDto("tcp_flags_ece ", PcapTypes.INTEGER )); - columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.INTEGER )); - columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", PcapTypes.INTEGER )); - columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.INTEGER )); - columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.INTEGER )); - columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.INTEGER )); - columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.INTEGER )); - columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.INTEGER )); - columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.INTEGER )); + columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_ece ", PcapTypes.BOOLEAN )); + columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.BOOLEAN )); + columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", PcapTypes.BOOLEAN )); + columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.BOOLEAN )); + columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.BOOLEAN )); + columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.BOOLEAN )); + columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.BOOLEAN )); + columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.BOOLEAN )); + columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.BOOLEAN )); columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING)); columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER)); + columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN)); columns.add(new ColumnDto("data", PcapTypes.STRING)); } diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java index 47ab015..ac1fe3b 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java @@ -43,6 +43,13 @@ public class TestPcapRecordReader extends BaseTestQuery { } @Test + public void testCorruptPCAPQuery() throws Exception { + runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap`", 7000); + runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap` WHERE is_corrupt=false", 6408); + runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap` WHERE is_corrupt=true", 592); + } + + @Test public void testCountQuery() throws Exception { runSQLVerifyCount("select count(*) from dfs.`store/pcap/tcp-1.pcap`", 1); runSQLVerifyCount("select count(*) from dfs.`store/pcap/tcp-2.pcap`", 1); diff --git a/exec/java-exec/src/test/resources/store/pcap/testv1.pcap b/exec/java-exec/src/test/resources/store/pcap/testv1.pcap new file mode 100644 index 0000000..b52dbfb Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcap/testv1.pcap differ
