This is an automated email from the ASF dual-hosted git repository.

volodymyr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/drill.git

commit 12487c9b34cf6211d9f954801ae63efc4e43280d
Author: Charles S. Givre <[email protected]>
AuthorDate: Mon Mar 25 11:40:29 2019 -0400

    DRILL-7032: Ignore corrupt rows in a PCAP file
    
    closes #1637
---
 .../drill/exec/store/pcap/PcapRecordReader.java    |  51 +++++++++++++--------
 .../drill/exec/store/pcap/decoder/Packet.java      |  19 +++++++-
 .../drill/exec/store/pcap/schema/Schema.java       |  25 +++++-----
 .../exec/store/pcap/TestPcapRecordReader.java      |   7 +++
 .../src/test/resources/store/pcap/testv1.pcap      | Bin 0 -> 544736 bytes
 5 files changed, 68 insertions(+), 34 deletions(-)

diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
index 08f501f..f9b8a72 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.pcap;
 
+import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
 import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
@@ -85,11 +86,12 @@ public class PcapRecordReader extends AbstractRecordReader {
 
   static {
     TYPES = ImmutableMap.<PcapTypes, TypeProtos.MinorType>builder()
-        .put(PcapTypes.STRING, MinorType.VARCHAR)
-        .put(PcapTypes.INTEGER, MinorType.INT)
-        .put(PcapTypes.LONG, MinorType.BIGINT)
-        .put(PcapTypes.TIMESTAMP, MinorType.TIMESTAMP)
-        .build();
+            .put(PcapTypes.STRING, MinorType.VARCHAR)
+            .put(PcapTypes.INTEGER, MinorType.INT)
+            .put(PcapTypes.LONG, MinorType.BIGINT)
+            .put(PcapTypes.TIMESTAMP, MinorType.TIMESTAMP)
+            .put(PcapTypes.BOOLEAN, MinorType.BIT)
+            .build();
   }
 
   public PcapRecordReader(final Path pathToFile,
@@ -112,8 +114,8 @@ public class PcapRecordReader extends AbstractRecordReader {
       setColumns(projectedColumns);
     } catch (IOException io) {
       throw UserException.dataReadError(io)
-          .addContext("File name:", pathToFile.toUri().getPath())
-          .build(logger);
+              .addContext("File name:", pathToFile.toUri().getPath())
+              .build(logger);
     }
   }
 
@@ -123,8 +125,8 @@ public class PcapRecordReader extends AbstractRecordReader {
       return parsePcapFilesAndPutItToTable();
     } catch (IOException io) {
       throw UserException.dataReadError(io)
-          .addContext("Trouble with reading packets in file!")
-          .build(logger);
+              .addContext("Trouble with reading packets in file!")
+              .build(logger);
     }
   }
 
@@ -160,10 +162,10 @@ public class PcapRecordReader extends 
AbstractRecordReader {
     TypeProtos.MajorType majorType = getMajorType(minorType);
 
     MaterializedField field =
-        MaterializedField.create(name, majorType);
+            MaterializedField.create(name, majorType);
 
     ValueVector vector =
-        getValueVector(minorType, majorType, field);
+            getValueVector(minorType, majorType, field);
 
     return getProjectedColumnInfo(column, vector);
   }
@@ -185,7 +187,7 @@ public class PcapRecordReader extends AbstractRecordReader {
     try {
 
       final Class<? extends ValueVector> clazz = 
TypeHelper.getValueVectorClass(
-          minorType, majorType.getMode());
+              minorType, majorType.getMode());
       ValueVector vector = output.addField(field, clazz);
       vector.allocateNew();
       return vector;
@@ -223,6 +225,7 @@ public class PcapRecordReader extends AbstractRecordReader {
       int old = offset;
       offset = decoder.decodePacket(buffer, offset, packet, 
decoder.getMaxLength(), validBytes);
       if (offset > validBytes) {
+        //Start here...
         logger.error("Invalid packet at offset {}", old);
       }
 
@@ -286,7 +289,7 @@ public class PcapRecordReader extends AbstractRecordReader {
           break;
         case "tcp_ack":
           if (packet.isTcpPacket()) {
-            setIntegerColumnValue(packet.getAckNumber(), pci, count);
+            setBooleanColumnValue(packet.getAckNumber(), pci, count);
           }
           break;
         case "tcp_flags":
@@ -357,6 +360,9 @@ public class PcapRecordReader extends AbstractRecordReader {
         case "packet_length":
           setIntegerColumnValue(packet.getPacketLength(), pci, count);
           break;
+        case "is_corrupt":
+          setBooleanColumnValue(packet.isCorrupt(), pci, count);
+          break;
         case "data":
           if (packet.getData() != null) {
             setStringColumnValue(parseBytesToASCII(packet.getData()), pci, 
count);
@@ -371,32 +377,37 @@ public class PcapRecordReader extends 
AbstractRecordReader {
 
   private void setLongColumnValue(long data, ProjectedColumnInfo pci, final 
int count) {
     ((NullableBigIntVector.Mutator) pci.vv.getMutator())
-        .setSafe(count, data);
+            .setSafe(count, data);
   }
 
   private void setIntegerColumnValue(final int data, final ProjectedColumnInfo 
pci, final int count) {
     ((NullableIntVector.Mutator) pci.vv.getMutator())
-        .setSafe(count, data);
+            .setSafe(count, data);
   }
 
   private void setBooleanColumnValue(final boolean data, final 
ProjectedColumnInfo pci, final int count) {
-    ((NullableIntVector.Mutator) pci.vv.getMutator())
-        .setSafe(count, data ? 1 : 0);
+    ((NullableBitVector.Mutator) pci.vv.getMutator())
+            .setSafe(count, data ? 1 : 0);
+  }
+
+  private void setBooleanColumnValue(final int data, final ProjectedColumnInfo 
pci, final int count) {
+    ((NullableBitVector.Mutator) pci.vv.getMutator())
+            .setSafe(count, data);
   }
 
   private void setTimestampColumnValue(final long data, final 
ProjectedColumnInfo pci, final int count) {
     ((NullableTimeStampVector.Mutator) pci.vv.getMutator())
-        .setSafe(count, data);
+            .setSafe(count, data);
   }
 
   private void setStringColumnValue(final String data, final 
ProjectedColumnInfo pci, final int count) {
     if (data == null) {
       ((NullableVarCharVector.Mutator) pci.vv.getMutator())
-          .setNull(count);
+              .setNull(count);
     } else {
       ByteBuffer value = ByteBuffer.wrap(data.getBytes(UTF_8));
       ((NullableVarCharVector.Mutator) pci.vv.getMutator())
-          .setSafe(count, value, 0, value.remaining());
+              .setSafe(count, value, 0, value.remaining());
     }
   }
 
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
index 8e6a81b..5afe7f0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java
@@ -18,6 +18,8 @@
 package org.apache.drill.exec.store.pcap.decoder;
 
 import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -53,8 +55,11 @@ public class Packet {
   private int packetLength;
   protected int etherProtocol;
   protected int protocol;
-
   protected boolean isRoutingV6;
+  protected boolean isCorrupt = false;
+
+  private static final Logger logger = LoggerFactory.getLogger(Packet.class);
+
 
   @SuppressWarnings("WeakerAccess")
   public boolean readPcap(final InputStream in, final boolean byteOrder, final 
int maxLength) throws IOException {
@@ -312,6 +317,10 @@ public class Packet {
     return getPort(2);
   }
 
+  public boolean isCorrupt(){
+    return isCorrupt;
+  }
+
   public byte[] getData() {
     int payloadDataStart = getIPHeaderLength();
     if (isTcpPacket()) {
@@ -324,7 +333,13 @@ public class Packet {
     byte[] data = null;
     if (packetLength >= payloadDataStart) {
       data = new byte[packetLength - payloadDataStart];
-      System.arraycopy(raw, ipOffset + payloadDataStart, data, 0, data.length);
+      try {
+        System.arraycopy(raw, ipOffset + payloadDataStart, data, 0, 
data.length);
+      } catch (Exception e) {
+        isCorrupt = true;
+        String message = "Error while parsing PCAP data: {}";
+        logger.debug(message, e.getMessage());
+        logger.trace(message, e);      }
     }
     return data;
   }
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
index 07ecd4b..e6a2f5d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java
@@ -43,21 +43,22 @@ public class Schema {
     columns.add(new ColumnDto("src_mac_address", PcapTypes.STRING));
     columns.add(new ColumnDto("dst_mac_address", PcapTypes.STRING));
     columns.add(new ColumnDto("tcp_session", PcapTypes.LONG));
-    columns.add(new ColumnDto("tcp_ack", PcapTypes.INTEGER));
+    columns.add(new ColumnDto("tcp_ack", PcapTypes.BOOLEAN));
     columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER));
-    columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.INTEGER));
-    columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.INTEGER));
-    columns.add(new ColumnDto("tcp_flags_ece ", PcapTypes.INTEGER ));
-    columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.INTEGER 
));
-    columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", 
PcapTypes.INTEGER ));
-    columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.INTEGER ));
-    columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.INTEGER ));
-    columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.INTEGER ));
-    columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.INTEGER ));
-    columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.INTEGER ));
-    columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.INTEGER ));
+    columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.BOOLEAN));
+    columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.BOOLEAN));
+    columns.add(new ColumnDto("tcp_flags_ece ", PcapTypes.BOOLEAN ));
+    columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.BOOLEAN 
));
+    columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", 
PcapTypes.BOOLEAN ));
+    columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.BOOLEAN ));
+    columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.BOOLEAN ));
+    columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.BOOLEAN ));
+    columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.BOOLEAN ));
+    columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.BOOLEAN ));
+    columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.BOOLEAN ));
     columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING));
     columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER));
+    columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN));
     columns.add(new ColumnDto("data", PcapTypes.STRING));
   }
 
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
index 47ab015..ac1fe3b 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapRecordReader.java
@@ -43,6 +43,13 @@ public class TestPcapRecordReader extends BaseTestQuery {
   }
 
   @Test
+  public void testCorruptPCAPQuery() throws Exception {
+    runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap`", 7000);
+    runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap` WHERE 
is_corrupt=false", 6408);
+    runSQLVerifyCount("select * from dfs.`store/pcap/testv1.pcap` WHERE 
is_corrupt=true", 592);
+  }
+
+  @Test
   public void testCountQuery() throws Exception {
     runSQLVerifyCount("select count(*) from dfs.`store/pcap/tcp-1.pcap`", 1);
     runSQLVerifyCount("select count(*) from dfs.`store/pcap/tcp-2.pcap`", 1);
diff --git a/exec/java-exec/src/test/resources/store/pcap/testv1.pcap 
b/exec/java-exec/src/test/resources/store/pcap/testv1.pcap
new file mode 100644
index 0000000..b52dbfb
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/store/pcap/testv1.pcap differ

Reply via email to