This is an automated email from the ASF dual-hosted git repository. volodymyr pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/drill.git
commit bd549b9cbfa538038ec440aa8c623b3bdb40d50b Author: Charles Givre <[email protected]> AuthorDate: Fri Nov 22 16:04:29 2019 -0500 DRILL-7443: Enable PCAP Plugin to Reassemble TCP Streams closes #1898 --- .../drill/exec/store/pcap/PcapBatchReader.java | 203 ++++++++-- .../drill/exec/store/pcap/PcapFormatConfig.java | 8 +- .../drill/exec/store/pcap/PcapRecordReader.java | 418 --------------------- .../drill/exec/store/pcap/decoder/Packet.java | 49 ++- .../exec/store/pcap/decoder/TcpHandshake.java | 144 +++++++ .../drill/exec/store/pcap/decoder/TcpSession.java | 321 ++++++++++++++++ .../drill/exec/store/pcap/schema/PcapTypes.java | 5 +- .../drill/exec/store/pcap/schema/Schema.java | 72 ++-- .../store/dfs/TestFormatPluginOptionExtractor.java | 4 +- .../drill/exec/store/pcap/TestPcapDecoder.java | 8 +- .../drill/exec/store/pcap/TestPcapEVFReader.java | 1 - .../drill/exec/store/pcap/TestSessionizePCAP.java | 107 ++++++ .../test/resources/store/pcap/attack-trace.pcap | Bin 0 -> 189103 bytes 13 files changed, 850 insertions(+), 490 deletions(-) diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java index c9caa24..5e4a46c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapBatchReader.java @@ -26,6 +26,7 @@ import org.apache.drill.exec.record.metadata.SchemaBuilder; import org.apache.drill.exec.record.metadata.TupleMetadata; import org.apache.drill.exec.store.pcap.decoder.Packet; import org.apache.drill.exec.store.pcap.decoder.PacketDecoder; +import org.apache.drill.exec.store.pcap.decoder.TcpSession; import org.apache.drill.exec.store.pcap.schema.Schema; import org.apache.drill.exec.vector.accessor.ScalarWriter; import org.apache.hadoop.mapred.FileSplit; @@ -35,12 +36,14 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InputStream; +import java.util.HashMap; +import java.util.Map; import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII; public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { - public static final int BUFFER_SIZE = 500_000; + protected static final int BUFFER_SIZE = 500_000; private static final Logger logger = LoggerFactory.getLogger(PcapBatchReader.class); @@ -116,17 +119,56 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { private ScalarWriter isCorruptWriter; + private PcapReaderConfig readerConfig; + + + // Writers for TCP Sessions + private ScalarWriter sessionStartTimeWriter; + + private ScalarWriter sessionEndTimeWriter; + + private ScalarWriter sessionDurationWriter; + + private ScalarWriter connectionTimeWriter; + + private ScalarWriter packetCountWriter; + + private ScalarWriter originPacketCounterWriter; + + private ScalarWriter remotePacketCounterWriter; + + private ScalarWriter originDataVolumeWriter; + + private ScalarWriter remoteDataVolumeWriter; + + private ScalarWriter hostDataWriter; + + private ScalarWriter remoteDataWriter; + + + private Map<Long, TcpSession> sessionQueue; + public static class PcapReaderConfig { protected final PcapFormatPlugin plugin; + public boolean sessionizeTCPStreams; + + private PcapFormatConfig config; + public PcapReaderConfig(PcapFormatPlugin plugin) { this.plugin = plugin; + this.config = plugin.getConfig(); + this.sessionizeTCPStreams = config.sessionizeTCPStreams; } } public PcapBatchReader(PcapReaderConfig readerConfig) { + this.readerConfig = readerConfig; + if (readerConfig.sessionizeTCPStreams) { + sessionQueue = new HashMap<>(); + } } @Override @@ -134,46 +176,14 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { split = negotiator.split(); openFile(negotiator); SchemaBuilder builder = new SchemaBuilder(); - Schema pcapSchema = new Schema(); + Schema pcapSchema = new Schema(readerConfig.sessionizeTCPStreams); TupleMetadata schema = pcapSchema.buildSchema(builder); negotiator.setTableSchema(schema, false); ResultSetLoader loader = negotiator.build(); // Creates writers for all fields (Since schema is known) rowWriter = loader.writer(); - typeWriter = rowWriter.scalar("type"); - timestampWriter = rowWriter.scalar("packet_timestamp"); - timestampMicroWriter = rowWriter.scalar("timestamp_micro"); - networkWriter = rowWriter.scalar("network"); - srcMacAddressWriter = rowWriter.scalar("src_mac_address"); - dstMacAddressWriter = rowWriter.scalar("dst_mac_address"); - dstIPWriter = rowWriter.scalar("dst_ip"); - srcIPWriter = rowWriter.scalar("src_ip"); - srcPortWriter = rowWriter.scalar("src_port"); - dstPortWriter = rowWriter.scalar("dst_port"); - packetLengthWriter = rowWriter.scalar("packet_length"); - - //Writers for TCP Packets - tcpSessionWriter = rowWriter.scalar("tcp_session"); - tcpSequenceWriter = rowWriter.scalar("tcp_sequence"); - tcpAckNumberWriter = rowWriter.scalar("tcp_ack"); - tcpFlagsWriter = rowWriter.scalar("tcp_flags"); - tcpParsedFlagsWriter = rowWriter.scalar("tcp_parsed_flags"); - tcpNsWriter = rowWriter.scalar("tcp_flags_ns"); - tcpCwrWriter = rowWriter.scalar("tcp_flags_cwr"); - tcpEceWriter = rowWriter.scalar("tcp_flags_ece"); - tcpFlagsEceEcnCapableWriter = rowWriter.scalar("tcp_flags_ece_ecn_capable"); - tcpFlagsCongestionWriter = rowWriter.scalar("tcp_flags_ece_congestion_experienced"); - - tcpUrgWriter = rowWriter.scalar("tcp_flags_urg"); - tcpAckWriter = rowWriter.scalar("tcp_flags_ack"); - tcpPshWriter = rowWriter.scalar("tcp_flags_psh"); - tcpRstWriter = rowWriter.scalar("tcp_flags_rst"); - tcpSynWriter = rowWriter.scalar("tcp_flags_syn"); - tcpFinWriter = rowWriter.scalar("tcp_flags_fin"); - - dataWriter = rowWriter.scalar("data"); - isCorruptWriter = rowWriter.scalar("is_corrupt"); + populateColumnWriters(rowWriter); return true; } @@ -190,6 +200,14 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { @Override public void close() { + + /* This warning could occur in the event of a corrupt or incomplete PCAP file. Specifically, + * if a session is started in one file and the end of the session is not captured in the same file. + */ + if (sessionQueue != null && !sessionQueue.isEmpty()) { + logger.warn("Unclosed sessions remaining in PCAP"); + } + try { fsStream.close(); } catch (IOException e) { @@ -217,7 +235,69 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { } } + private void populateColumnWriters(RowSetLoader rowWriter) { + if (readerConfig.sessionizeTCPStreams) { + srcMacAddressWriter = rowWriter.scalar("src_mac_address"); + dstMacAddressWriter = rowWriter.scalar("dst_mac_address"); + dstIPWriter = rowWriter.scalar("dst_ip"); + srcIPWriter = rowWriter.scalar("src_ip"); + srcPortWriter = rowWriter.scalar("src_port"); + dstPortWriter = rowWriter.scalar("dst_port"); + sessionStartTimeWriter = rowWriter.scalar("session_start_time"); + sessionEndTimeWriter = rowWriter.scalar("session_end_time"); + sessionDurationWriter = rowWriter.scalar("session_duration"); + connectionTimeWriter = rowWriter.scalar("connection_time"); + tcpSessionWriter = rowWriter.scalar("tcp_session"); + packetCountWriter = rowWriter.scalar("total_packet_count"); + hostDataWriter = rowWriter.scalar("data_from_originator"); + remoteDataWriter = rowWriter.scalar("data_from_remote"); + + originPacketCounterWriter = rowWriter.scalar("packet_count_from_origin"); + remotePacketCounterWriter = rowWriter.scalar("packet_count_from_remote"); + originDataVolumeWriter = rowWriter.scalar("data_volume_from_origin"); + remoteDataVolumeWriter = rowWriter.scalar("data_volume_from_remote"); + isCorruptWriter = rowWriter.scalar("is_corrupt"); + + } else { + typeWriter = rowWriter.scalar("type"); + timestampWriter = rowWriter.scalar("packet_timestamp"); + timestampMicroWriter = rowWriter.scalar("timestamp_micro"); + networkWriter = rowWriter.scalar("network"); + srcMacAddressWriter = rowWriter.scalar("src_mac_address"); + dstMacAddressWriter = rowWriter.scalar("dst_mac_address"); + dstIPWriter = rowWriter.scalar("dst_ip"); + srcIPWriter = rowWriter.scalar("src_ip"); + srcPortWriter = rowWriter.scalar("src_port"); + dstPortWriter = rowWriter.scalar("dst_port"); + packetLengthWriter = rowWriter.scalar("packet_length"); + + //Writers for TCP Packets + tcpSessionWriter = rowWriter.scalar("tcp_session"); + tcpSequenceWriter = rowWriter.scalar("tcp_sequence"); + tcpAckNumberWriter = rowWriter.scalar("tcp_ack"); + tcpFlagsWriter = rowWriter.scalar("tcp_flags"); + tcpParsedFlagsWriter = rowWriter.scalar("tcp_parsed_flags"); + tcpNsWriter = rowWriter.scalar("tcp_flags_ns"); + tcpCwrWriter = rowWriter.scalar("tcp_flags_cwr"); + tcpEceWriter = rowWriter.scalar("tcp_flags_ece"); + tcpFlagsEceEcnCapableWriter = rowWriter.scalar("tcp_flags_ece_ecn_capable"); + tcpFlagsCongestionWriter = rowWriter.scalar("tcp_flags_ece_congestion_experienced"); + + tcpUrgWriter = rowWriter.scalar("tcp_flags_urg"); + tcpAckWriter = rowWriter.scalar("tcp_flags_ack"); + tcpPshWriter = rowWriter.scalar("tcp_flags_psh"); + tcpRstWriter = rowWriter.scalar("tcp_flags_rst"); + tcpSynWriter = rowWriter.scalar("tcp_flags_syn"); + tcpFinWriter = rowWriter.scalar("tcp_flags_fin"); + + dataWriter = rowWriter.scalar("data"); + isCorruptWriter = rowWriter.scalar("is_corrupt"); + } + } + private boolean parseNextPacket(RowSetLoader rowWriter) { + + // Decode the packet Packet packet = new Packet(); if (offset >= validBytes) { @@ -234,8 +314,28 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { packet.setIsCorrupt(true); logger.debug("Invalid packet at offset {}", old); } - addDataToTable(packet, decoder.getNetwork(), rowWriter); + // If we are resessionizing the TCP Stream, add the packet to the stream + if (readerConfig.sessionizeTCPStreams) { + // If the session has not been seen before, add it to the queue + long sessionID = packet.getSessionHash(); + if (!sessionQueue.containsKey(sessionID)) { + logger.debug("Adding session {} to session queue.", sessionID); + sessionQueue.put(sessionID, new TcpSession(sessionID)); + } + + // When the session is closed, write it and remove it from the session queue. + sessionQueue.get(sessionID).addPacket(packet); + if (sessionQueue.get(sessionID).connectionClosed()) { + // Write out the session + addSessionDataToTable(sessionQueue.get(sessionID), rowWriter); + // Remove from the queue + sessionQueue.remove(sessionID); + } + + } else { + addDataToTable(packet, decoder.getNetwork(), rowWriter); + } return true; } @@ -268,7 +368,35 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { return true; } - private boolean addDataToTable(Packet packet, int networkType, RowSetLoader rowWriter) { + private void addSessionDataToTable(TcpSession session, RowSetLoader rowWriter) { + rowWriter.start(); + + sessionStartTimeWriter.setTimestamp(session.getSessionStartTime()); + sessionEndTimeWriter.setTimestamp(session.getSessionEndTime()); + sessionDurationWriter.setPeriod(session.getSessionDuration()); + connectionTimeWriter.setPeriod(session.getConnectionTime()); + + srcMacAddressWriter.setString(session.getSrcMac()); + dstMacAddressWriter.setString(session.getDstMac()); + srcIPWriter.setString(session.getSrcIP()); + dstIPWriter.setString(session.getDstIP()); + srcPortWriter.setInt(session.getSrcPort()); + dstPortWriter.setInt(session.getDstPort()); + tcpSessionWriter.setLong(session.getSessionID()); + packetCountWriter.setInt(session.getPacketCount()); + + originPacketCounterWriter.setInt(session.getPacketCountFromOrigin()); + remotePacketCounterWriter.setInt(session.getPacketCountFromRemote()); + originDataVolumeWriter.setInt(session.getDataFromOriginator().length); + remoteDataVolumeWriter.setInt(session.getDataFromRemote().length); + isCorruptWriter.setBoolean(session.hasCorruptedData()); + + hostDataWriter.setString(session.getDataFromOriginatorAsString()); + remoteDataWriter.setString(session.getDataFromRemoteAsString()); + rowWriter.save(); + } + + private void addDataToTable(Packet packet, int networkType, RowSetLoader rowWriter) { rowWriter.start(); typeWriter.setString(packet.getPacketType()); @@ -312,6 +440,5 @@ public class PcapBatchReader implements ManagedReader<FileSchemaNegotiator> { // TODO Parse Data Packet Here: // Description of work in // DRILL-7400: Add Packet Decoders with Interface to Drill - return true; } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java index c06ddf9..133b5d7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapFormatConfig.java @@ -34,13 +34,16 @@ public class PcapFormatConfig implements FormatPluginConfig { public List<String> extensions; @JsonInclude(JsonInclude.Include.NON_DEFAULT) + public boolean sessionizeTCPStreams = false; + + @JsonInclude(JsonInclude.Include.NON_DEFAULT) public List<String> getExtensions() { return extensions == null ? DEFAULT_EXTS : extensions; } @Override public int hashCode() { - return Arrays.hashCode(new Object[]{extensions}); + return Arrays.hashCode(new Object[]{extensions, sessionizeTCPStreams}); } @Override @@ -52,6 +55,7 @@ public class PcapFormatConfig implements FormatPluginConfig { return false; } PcapFormatConfig other = (PcapFormatConfig) obj; - return Objects.equal(extensions, other.extensions); + return Objects.equal(extensions, other.extensions) + && Objects.equal(sessionizeTCPStreams, other.sessionizeTCPStreams); } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java deleted file mode 100644 index f9b8a72..0000000 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/PcapRecordReader.java +++ /dev/null @@ -1,418 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.drill.exec.store.pcap; - -import org.apache.drill.exec.vector.NullableBitVector; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList; -import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap; -import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.exceptions.UserException; -import org.apache.drill.common.expression.SchemaPath; -import org.apache.drill.common.types.TypeProtos; -import org.apache.drill.common.types.TypeProtos.MajorType; -import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.types.Types; -import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.ops.OperatorContext; -import org.apache.drill.exec.physical.impl.OutputMutator; -import org.apache.drill.exec.record.MaterializedField; -import org.apache.drill.exec.store.AbstractRecordReader; -import org.apache.drill.exec.store.pcap.decoder.Packet; -import org.apache.drill.exec.store.pcap.decoder.PacketDecoder; -import org.apache.drill.exec.store.pcap.dto.ColumnDto; -import org.apache.drill.exec.store.pcap.schema.PcapTypes; -import org.apache.drill.exec.store.pcap.schema.Schema; -import org.apache.drill.exec.vector.NullableBigIntVector; -import org.apache.drill.exec.vector.NullableIntVector; -import org.apache.drill.exec.vector.NullableTimeStampVector; -import org.apache.drill.exec.vector.NullableVarCharVector; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; - -import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII; - -public class PcapRecordReader extends AbstractRecordReader { - static final int BUFFER_SIZE = 500_000; // this should be relatively large relative to max packet - - private static final Logger logger = LoggerFactory.getLogger(PcapRecordReader.class); - private static final int BATCH_SIZE = 40_000; - - private OutputMutator output; - - private PacketDecoder decoder; - private ImmutableList<ProjectedColumnInfo> projectedCols; - private FileSystem fs; - - private byte[] buffer; - private int offset = 0; - private FSDataInputStream in; - private int validBytes; - - private final Path pathToFile; - private List<SchemaPath> projectedColumns; - - private static final Map<PcapTypes, MinorType> TYPES; - - private static class ProjectedColumnInfo { - ValueVector vv; - ColumnDto pcapColumn; - } - - static { - TYPES = ImmutableMap.<PcapTypes, TypeProtos.MinorType>builder() - .put(PcapTypes.STRING, MinorType.VARCHAR) - .put(PcapTypes.INTEGER, MinorType.INT) - .put(PcapTypes.LONG, MinorType.BIGINT) - .put(PcapTypes.TIMESTAMP, MinorType.TIMESTAMP) - .put(PcapTypes.BOOLEAN, MinorType.BIT) - .build(); - } - - public PcapRecordReader(final Path pathToFile, - final FileSystem fileSystem, - final List<SchemaPath> projectedColumns) { - this.fs = fileSystem; - this.pathToFile = fs.makeQualified(pathToFile); - this.projectedColumns = projectedColumns; - } - - @Override - public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException { - try { - this.output = output; - this.in = fs.open(pathToFile); - this.decoder = new PacketDecoder(in); - this.buffer = new byte[BUFFER_SIZE + decoder.getMaxLength()]; - this.validBytes = in.read(buffer); - this.projectedCols = getProjectedColsIfItNull(); - setColumns(projectedColumns); - } catch (IOException io) { - throw UserException.dataReadError(io) - .addContext("File name:", pathToFile.toUri().getPath()) - .build(logger); - } - } - - @Override - public int next() { - try { - return parsePcapFilesAndPutItToTable(); - } catch (IOException io) { - throw UserException.dataReadError(io) - .addContext("Trouble with reading packets in file!") - .build(logger); - } - } - - @Override - public void close() throws Exception { - in.close(); - } - - private ImmutableList<ProjectedColumnInfo> getProjectedColsIfItNull() { - return projectedCols != null ? projectedCols : initCols(new Schema()); - } - - private ImmutableList<ProjectedColumnInfo> initCols(final Schema schema) { - ImmutableList.Builder<ProjectedColumnInfo> pciBuilder = ImmutableList.builder(); - ColumnDto column; - - for (int i = 0; i < schema.getNumberOfColumns(); i++) { - column = schema.getColumnByIndex(i); - - final String name = column.getColumnName().toLowerCase(); - final PcapTypes type = column.getColumnType(); - TypeProtos.MinorType minorType = TYPES.get(type); - - ProjectedColumnInfo pci = getProjectedColumnInfo(column, name, minorType); - pciBuilder.add(pci); - } - return pciBuilder.build(); - } - - private ProjectedColumnInfo getProjectedColumnInfo(final ColumnDto column, - final String name, - final MinorType minorType) { - TypeProtos.MajorType majorType = getMajorType(minorType); - - MaterializedField field = - MaterializedField.create(name, majorType); - - ValueVector vector = - getValueVector(minorType, majorType, field); - - return getProjectedColumnInfo(column, vector); - } - - private ProjectedColumnInfo getProjectedColumnInfo(final ColumnDto column, final ValueVector vector) { - ProjectedColumnInfo pci = new ProjectedColumnInfo(); - pci.vv = vector; - pci.pcapColumn = column; - return pci; - } - - private MajorType getMajorType(final MinorType minorType) { - return Types.optional(minorType); - } - - private ValueVector getValueVector(final MinorType minorType, - final MajorType majorType, - final MaterializedField field) { - try { - - final Class<? extends ValueVector> clazz = TypeHelper.getValueVectorClass( - minorType, majorType.getMode()); - ValueVector vector = output.addField(field, clazz); - vector.allocateNew(); - return vector; - - } catch (SchemaChangeException sce) { - throw new IllegalStateException("The addition of this field is incompatible with this OutputMutator's capabilities"); - } - } - - private int parsePcapFilesAndPutItToTable() throws IOException { - Packet packet = new Packet(); - int counter = 0; - while (offset < validBytes && counter < BATCH_SIZE) { - if (validBytes - offset < decoder.getMaxLength()) { - if (validBytes == buffer.length) { - // shift data and read more. This is the common case. - System.arraycopy(buffer, offset, buffer, 0, validBytes - offset); - validBytes = validBytes - offset; - offset = 0; - - int n = in.read(buffer, validBytes, buffer.length - validBytes); - if (n > 0) { - validBytes += n; - } - logger.info("read {} bytes, at {} offset", n, validBytes); - } else { - // near the end of the file, we will just top up the buffer without shifting - int n = in.read(buffer, offset, buffer.length - offset); - if (n > 0) { - validBytes = validBytes + n; - logger.info("Topped up buffer with {} bytes to yield {}\n", n, validBytes); - } - } - } - int old = offset; - offset = decoder.decodePacket(buffer, offset, packet, decoder.getMaxLength(), validBytes); - if (offset > validBytes) { - //Start here... - logger.error("Invalid packet at offset {}", old); - } - - if (addDataToTable(packet, decoder.getNetwork(), counter)) { - counter++; - } - } - return counter; - } - - private boolean addDataToTable(final Packet packet, final int networkType, final int count) { - for (ProjectedColumnInfo pci : projectedCols) { - switch (pci.pcapColumn.getColumnName()) { - case "type": - setStringColumnValue(packet.getPacketType(), pci, count); - break; - case "timestamp": - setTimestampColumnValue(packet.getTimestamp(), pci, count); - break; - case "timestamp_micro": - setLongColumnValue(packet.getTimestampMicro(), pci, count); - break; - case "network": - setIntegerColumnValue(networkType, pci, count); - break; - case "src_mac_address": - setStringColumnValue(packet.getEthernetSource(), pci, count); - break; - case "dst_mac_address": - setStringColumnValue(packet.getEthernetDestination(), pci, count); - break; - case "dst_ip": - if (packet.getDst_ip() != null) { - setStringColumnValue(packet.getDst_ip().getHostAddress(), pci, count); - } else { - setStringColumnValue(null, pci, count); - } - break; - case "src_ip": - if (packet.getSrc_ip() != null) { - setStringColumnValue(packet.getSrc_ip().getHostAddress(), pci, count); - } else { - setStringColumnValue(null, pci, count); - } - break; - case "src_port": - setIntegerColumnValue(packet.getSrc_port(), pci, count); - break; - case "dst_port": - setIntegerColumnValue(packet.getDst_port(), pci, count); - break; - case "tcp_session": - if (packet.isTcpPacket()) { - setLongColumnValue(packet.getSessionHash(), pci, count); - } - break; - case "tcp_sequence": - if (packet.isTcpPacket()) { - setIntegerColumnValue(packet.getSequenceNumber(), pci, count); - } - break; - case "tcp_ack": - if (packet.isTcpPacket()) { - setBooleanColumnValue(packet.getAckNumber(), pci, count); - } - break; - case "tcp_flags": - if (packet.isTcpPacket()) { - setIntegerColumnValue(packet.getFlags(), pci, count); - } - break; - case "tcp_parsed_flags": - if (packet.isTcpPacket()) { - setStringColumnValue(packet.getParsedFlags(), pci, count); - } - break; - case "tcp_flags_ns": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x100) != 0, pci, count); - } - break; - case "tcp_flags_cwr": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x80) != 0, pci, count); - } - break; - case "tcp_flags_ece ": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x40) != 0, pci, count); - } - break; - case "tcp_flags_ece_ecn_capable": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x42) == 0x42, pci, count); - } - break; - case "tcp_flags_ece_congestion_experienced": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x42) == 0x40, pci, count); - } - break; - case "tcp_flags_urg": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x20) != 0, pci, count); - } - break; - case "tcp_flags_ack": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x10) != 0, pci, count); - } - break; - case "tcp_flags_psh": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x8) != 0, pci, count); - } - break; - case "tcp_flags_rst": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x4) != 0, pci, count); - } - break; - case "tcp_flags_syn": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x2) != 0, pci, count); - } - break; - case "tcp_flags_fin": - if (packet.isTcpPacket()) { - setBooleanColumnValue((packet.getFlags() & 0x1) != 0, pci, count); - } - break; - case "packet_length": - setIntegerColumnValue(packet.getPacketLength(), pci, count); - break; - case "is_corrupt": - setBooleanColumnValue(packet.isCorrupt(), pci, count); - break; - case "data": - if (packet.getData() != null) { - setStringColumnValue(parseBytesToASCII(packet.getData()), pci, count); - } else { - setStringColumnValue("[]", pci, count); - } - break; - } - } - return true; - } - - private void setLongColumnValue(long data, ProjectedColumnInfo pci, final int count) { - ((NullableBigIntVector.Mutator) pci.vv.getMutator()) - .setSafe(count, data); - } - - private void setIntegerColumnValue(final int data, final ProjectedColumnInfo pci, final int count) { - ((NullableIntVector.Mutator) pci.vv.getMutator()) - .setSafe(count, data); - } - - private void setBooleanColumnValue(final boolean data, final ProjectedColumnInfo pci, final int count) { - ((NullableBitVector.Mutator) pci.vv.getMutator()) - .setSafe(count, data ? 1 : 0); - } - - private void setBooleanColumnValue(final int data, final ProjectedColumnInfo pci, final int count) { - ((NullableBitVector.Mutator) pci.vv.getMutator()) - .setSafe(count, data); - } - - private void setTimestampColumnValue(final long data, final ProjectedColumnInfo pci, final int count) { - ((NullableTimeStampVector.Mutator) pci.vv.getMutator()) - .setSafe(count, data); - } - - private void setStringColumnValue(final String data, final ProjectedColumnInfo pci, final int count) { - if (data == null) { - ((NullableVarCharVector.Mutator) pci.vv.getMutator()) - .setNull(count); - } else { - ByteBuffer value = ByteBuffer.wrap(data.getBytes(UTF_8)); - ((NullableVarCharVector.Mutator) pci.vv.getMutator()) - .setSafe(count, value, 0, value.remaining()); - } - } - - @Override - public String toString() { - return "PcapRecordReader[File=" + pathToFile.toUri() + "]"; - } -} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java index 803a779..7d49699 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/Packet.java @@ -33,7 +33,7 @@ import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getByte; import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getIntFileOrder; import static org.apache.drill.exec.store.pcap.PcapFormatUtils.getShort; -public class Packet { +public class Packet implements Comparable<Packet> { // pcap header // typedef struct pcaprec_hdr_s { // guint32 ts_sec; // timestamp seconds @@ -223,6 +223,42 @@ public class Packet { isCorrupt = value; } + public boolean getUrgFlag() { + return (getFlags() & 0x20) != 0; + } + + public boolean getPshFlag() { + return (getFlags() & 0x8) != 0; + } + + public boolean getEceFlag() { + return (getFlags() & 0x40) != 0; + } + + public boolean getSynFlag() { + return (getFlags() & 0x2) != 0; + } + + public boolean getAckFlag() { + return (getFlags() & 0x10) != 0; + } + + public boolean getRstFlag() { + return (getFlags() & 0x4) != 0; + } + + public boolean getFinFlag() { + return (getFlags() & 0x1) != 0; + } + + public boolean getNSFlag() { + return (getFlags() & 0x100) != 0; + } + + public boolean getCwrFlag() { + return (getFlags() & 0x80) != 0; + } + public static String formatFlags(int flags) { int mask = 0x100; StringBuilder r = new StringBuilder(); @@ -491,4 +527,15 @@ public class Packet { int dstPortOffset = ipOffset + getIPHeaderLength() + offset; return convertShort(raw, dstPortOffset); } + + /** + * This function is here so that packets can be sorted for re-sessionization. Packets in TCP streams + * are ordered by the sequence number, so being able to order the packets is necessary to reassemble the + * TCP session. + * @param o The packet to which the current packet is compared to. + * @return Returns the difference in sequence number. + */ + public int compareTo(Packet o) { + return this.getSequenceNumber() - (o).getSequenceNumber(); + } } diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java new file mode 100644 index 0000000..2fc3d05 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpHandshake.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.pcap.decoder; + +/** + * This class is used to record the status of the TCP Handshake. Initially this is used just to determine whether a session is open or closed, but + * future functionality could include SYN flood identification, or other hackery with TCP flags. + */ +public class TcpHandshake { + private boolean isConnected = false; + + private State currentSessionState = State.NONE; + + private long sessionID; + + /** + * Returns true for a correct TCP handshake: SYN|SYNACK|ACK, False if not. + * + * @return boolean true if the session is open, false if not. + */ + public boolean isConnected() { + return isConnected; + } + + /** + * This method returns true if the session is closed properly via FIN -> FIN ACK, false if not. + * + * @return boolean true if the session is closed, false if not. + */ + public boolean isClosed() { + if (currentSessionState == State.CLOSE_WAIT || + currentSessionState == State.FORCED_CLOSED || + currentSessionState == State.CLOSED || + currentSessionState == State.TIME_WAIT || + currentSessionState == State.FIN_WAIT) { + return true; + } else { + return false; + } + } + + public State getCurrentSessionState() { + return currentSessionState; + } + + public void setConnected(long sessionID) { + this.sessionID = sessionID; + currentSessionState = State.OPEN; + isConnected = true; + } + + public void setRst() { + isConnected = false; + currentSessionState = State.FORCED_CLOSED; + } + + public void setFin() { + if (currentSessionState == State.OPEN) { + currentSessionState = State.CLOSE_WAIT; // The next packet should be another FIN packet + } else if (currentSessionState == State.CLOSE_WAIT) { + currentSessionState = State.TIME_WAIT; + } + } + + public void setAck() { + if (currentSessionState == State.SYN) { + currentSessionState = State.SYNACK; + } else if (currentSessionState == State.SYNACK) { + currentSessionState = State.OPEN; + isConnected = true; + } else if (currentSessionState == State.CLOSE_WAIT) { + currentSessionState = State.FIN_WAIT; + } + } + + public void setSyn() { + if (currentSessionState == State.NONE) { + currentSessionState = State.SYN; + } + } + + /** + * This enum variable represents the various states of the TCP Handshake + */ + enum State { + /** + * The NONE state is the initialization state. No session has be established + */ + NONE, + /** + * The OPEN state represents a successfully opened TCP session. It is established in the final step in the TCP + * handshake. + */ + OPEN, + /** + * The CLOSED session represents a closed TCP session. This state occurs after the final ACK of the 4 way close process + */ + CLOSED, + /** + * The CLOSE_WAIT state represents a session in which one party has sent a frame with a FIN flag set. + * At this point resources can be released, however, to fully close the session the other party needs to send a frame + * with an ACK packet. + */ + CLOSE_WAIT, + /** + * This state occurs after receiving the first FIN/ACK frame. The recipient will then follow with a FIN frame, closing the session. + */ + TIME_WAIT, + /** + * The SYN state represents the first step in the TCP handshake. The originator has sent a frame with the SYN flag set. + * The next step would be the SYN/ACK stage. + */ + SYN, + /** + * This step represents the second step of the TCP handshake in which the recipient acknowledges the originator's SYN + * flag. + */ + SYNACK, + /** + * The FORCED_CLOSED state represents a session which was closed forcefully by a RST frame. + */ + FORCED_CLOSED, + /** + * The FIN_WAIT state occurs after receiving the initial FIN flag. + */ + FIN_WAIT + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java new file mode 100644 index 0000000..5900fd6 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/decoder/TcpSession.java @@ -0,0 +1,321 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.pcap.decoder; + +import org.joda.time.Instant; +import org.joda.time.Period; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.apache.drill.exec.store.pcap.PcapFormatUtils.parseBytesToASCII; + +/** + * This class is the representation of a TCP session. + */ +public class TcpSession { + + private final List<Packet> packetsFromSender; + private final List<Packet> packetsFromReceiver; + private final TcpHandshake handshake; + private final long sessionID; + + private long startTime; + private long endTime; + private int packetCount; + private InetAddress srcIP; + private InetAddress dstIP; + private int srcPort; + private int dstPort; + private String srcMac; + private String dstMac; + private long synTime; + private long ackTime; + private long connectTime; + private byte[] sentData; + private byte[] receivedData; + private int sentDataSize; + private int receivedDataSize; + private boolean hasCorruptedData = false; + + + private static final Logger logger = LoggerFactory.getLogger(TcpSession.class); + + public TcpSession (long sessionID) { + packetsFromSender = new ArrayList<>(); + packetsFromReceiver = new ArrayList<>(); + + handshake = new TcpHandshake(); + this.sessionID = sessionID; + } + + /** + * This function adds a packet to the TCP session. + * @param p The Packet to be added to the session + */ + public void addPacket(Packet p) { + + // Only attempt to add TCP packets to session + if (!p.getPacketType().equalsIgnoreCase("TCP")) { + return; + } + + // These variables should be consistent within a TCP session + if (packetCount == 0) { + srcIP = p.getSrc_ip(); + dstIP = p.getDst_ip(); + + srcPort = p.getSrc_port(); + dstPort = p.getDst_port(); + + srcMac = p.getEthernetSource(); + dstMac = p.getEthernetDestination(); + startTime = p.getTimestamp(); + } else if (p.getSessionHash() != sessionID) { + logger.warn("Attempting to add session {} to incorrect TCP session.", sessionID); + return; + } + + // Add packet to appropriate list and increment the data size counter + if (p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) { + packetsFromSender.add(p); + // Increment the data size counters + if (p.getData() != null) { + sentDataSize += p.getData().length; + } + + } else { + packetsFromReceiver.add(p); + if (p.getData() != null) { + receivedDataSize += p.getData().length; + } + } + + // Check flags if connection is not established + if (!handshake.isConnected()) { + if (p.getSynFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) { + // This is part 1 of the TCP session handshake + // The host sends the first SYN packet + handshake.setSyn(); + synTime = p.getTimestamp(); + } else if (p.getSynFlag() && p.getAckFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(dstIP.getHostAddress())) { + // This condition represents the second part of the TCP Handshake, + // where the receiver sends a frame with the SYN/ACK flags set to the originator + handshake.setAck(); + } else if (p.getAckFlag() && p.getSrc_ip().getHostAddress().equalsIgnoreCase(srcIP.getHostAddress())) { + // Finally, this condition represents a successful opening of a TCP session, when the originator sends a frame with only the ACK flag set. + // At this point we finalize the session object and clear out the flags. + handshake.setAck(); + ackTime = p.getTimestamp(); + connectTime = ackTime - synTime; + //handshake.setConnected(sessionID); + } + } else { + /* Check for flags to close connection. Closing a TCP session is more difficult than opening a session and there are + * a lot of ways that it can go wrong. See https://accedian.com/enterprises/blog/close-tcp-sessions-diagnose-disconnections/ for references on + * closing TCP sessions. + * + * To close a session correctly, there are four steps: + * 1. Party A sends a frame to party B with the FIN flag + * 2. Party B sends a frame with an ACK flag + * 3. Party B then follows with a frame with the FIN flag set + * 4. Party A then confirms with an ACK flag. + * + * Technically, the session is closed upon the first FIN flag and resources can be released at that point. However, a lot can go wrong, so to force the closing of a + * session, either party can send a frame with a RST flag set which forces the closing of the session. + */ + + if (p.getRstFlag()) { + // This is the case for a forced closed connection. Session is immediately closed. + handshake.setRst(); + } else if (p.getFinFlag()) { + // This is the beginning of the normal session closure procedure. If a FIN flag has been seen, the session is basically closed even if one party continues to send + // data, + handshake.setFin(); + } else if (handshake.getCurrentSessionState() == TcpHandshake.State.CLOSE_WAIT) { + + } + if (p.getAckFlag() && p.getFinFlag()) { + handshake.setAck(); + } + } + + // Augment the packet counter + packetCount++; + + if (p.isCorrupt()) { + hasCorruptedData = true; + } + + // Add the start and ending time stamp. The packets are not necessarily received in order, so we have to check the timestamps this way + if (p.getTimestampMicro() < startTime) { + startTime = p.getTimestamp(); + } + + if (p.getTimestampMicro() > endTime) { + endTime = p.getTimestamp(); + } + + // Close the session if the closing handshake is complete + if (handshake.isClosed()) { + closeSession(); + } + } + + /** + * This function returns true if the TCP session has been established, false if not. + * @return True if the session has been established, false if not. + */ + public boolean connectionEstablished() { + return handshake.isConnected(); + } + + public boolean connectionClosed() { + return handshake.isClosed(); + } + + public void closeSession() { + logger.debug("Closing session {}", sessionID); + /* The sent and received bytes cannot be written until the session is closed. + * Upon receipt of the FIN->FIN/ACK handshake, write everything. + * + * Since it cannot be assumed that the packets were received in the correct order, we must: + * 1. Sort them by TCP Sequence Number + * 2. Write the data to the respective byte array + */ + + Collections.sort(packetsFromSender); + Collections.sort(packetsFromReceiver); + + sentData = new byte[sentDataSize]; + receivedData = new byte[receivedDataSize]; + + byte[] dataFromPacket; + int dataOffset = 0; + // Now that the lists are sorted, add packet data to the lists + for (int i = 0; i < packetsFromSender.size(); i++) { + // Get the packet; + Packet p = packetsFromSender.get(i); + dataFromPacket = p.getData(); + if (dataFromPacket != null) { + for (int j = 0; j < dataFromPacket.length; j++) { + sentData[dataOffset] = dataFromPacket[j]; + dataOffset++; + } + } + } + + dataOffset = 0; + for (int i = 0; i < packetsFromReceiver.size(); i++) { + // Get the packet; + Packet p = packetsFromReceiver.get(i); + dataFromPacket = p.getData(); + if (dataFromPacket != null) { + for (int j = 0; j < dataFromPacket.length; j++) { + receivedData[dataOffset] = dataFromPacket[j]; + dataOffset++; + } + } + } + } + + public Instant getSessionStartTime() { + return new Instant(startTime); + } + + public Period getSessionDuration() { + return new Period(endTime - startTime); + } + + public Period getConnectionTime() { + return new Period(connectTime); + } + + public Instant getSessionEndTime() { + return new Instant(endTime); + } + + public String getSrcMac() { + return srcMac; + } + + public String getDstMac() { + return dstMac; + } + + public String getSrcIP() { + return srcIP.getHostAddress(); + } + + public String getDstIP() { + return dstIP.getHostAddress(); + } + + public int getSrcPort() { + return srcPort; + } + + public int getDstPort() { + return dstPort; + } + + public long getSessionID() { + return sessionID; + } + + public int getPacketCount() { + return packetsFromReceiver.size() + packetsFromSender.size(); + } + + public int getPacketCountFromOrigin() { return packetsFromSender.size(); } + + public int getPacketCountFromRemote() { return packetsFromReceiver.size(); } + + public boolean hasCorruptedData() { + return hasCorruptedData; + } + + public int getDataVolumeFromOrigin() { + return sentData.length; + } + + public int getDataVolumeFromRemote() { + return receivedData.length; + } + + public byte[] getDataFromOriginator() { + return sentData; + } + + public String getDataFromOriginatorAsString() { + return parseBytesToASCII(sentData); + } + + public byte[] getDataFromRemote() { + return receivedData; + } + + public String getDataFromRemoteAsString() { + return parseBytesToASCII(receivedData); + } +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java index fc6e029..c1fb4db 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/PcapTypes.java @@ -22,5 +22,6 @@ public enum PcapTypes { INTEGER, STRING, LONG, - TIMESTAMP -} \ No newline at end of file + TIMESTAMP, + DURATION +} diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java index d88bc97..7655d26 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pcap/schema/Schema.java @@ -30,8 +30,10 @@ public class Schema { private final List<ColumnDto> columns = new ArrayList<>(); private final MinorType typeMap[] = new MinorType[PcapTypes.values().length]; + private final boolean sessionizeTCPStreams; - public Schema() { + public Schema(boolean sessionSchema) { + this.sessionizeTCPStreams = sessionSchema; setupStructure(); } @@ -41,36 +43,58 @@ public class Schema { typeMap[PcapTypes.STRING.ordinal()] = MinorType.VARCHAR; typeMap[PcapTypes.LONG.ordinal()] = MinorType.BIGINT; typeMap[PcapTypes.TIMESTAMP.ordinal()] = MinorType.TIMESTAMP; + typeMap[PcapTypes.DURATION.ordinal()] = MinorType.INTERVAL; - columns.add(new ColumnDto("type", PcapTypes.STRING)); - columns.add(new ColumnDto("network", PcapTypes.INTEGER)); - columns.add(new ColumnDto("packet_timestamp", PcapTypes.TIMESTAMP)); - columns.add(new ColumnDto("timestamp_micro", PcapTypes.LONG)); + // Common columns columns.add(new ColumnDto("src_ip", PcapTypes.STRING)); columns.add(new ColumnDto("dst_ip", PcapTypes.STRING)); columns.add(new ColumnDto("src_port", PcapTypes.INTEGER)); columns.add(new ColumnDto("dst_port", PcapTypes.INTEGER)); columns.add(new ColumnDto("src_mac_address", PcapTypes.STRING)); columns.add(new ColumnDto("dst_mac_address", PcapTypes.STRING)); - columns.add(new ColumnDto("tcp_session", PcapTypes.LONG)); - columns.add(new ColumnDto("tcp_sequence", PcapTypes.INTEGER)); - columns.add(new ColumnDto("tcp_ack", PcapTypes.BOOLEAN)); - columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER)); - columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.BOOLEAN)); - columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.BOOLEAN)); - columns.add(new ColumnDto("tcp_flags_ece", PcapTypes.BOOLEAN )); - columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.BOOLEAN )); - columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", PcapTypes.BOOLEAN )); - columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.BOOLEAN )); - columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.BOOLEAN )); - columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.BOOLEAN )); - columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.BOOLEAN )); - columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.BOOLEAN )); - columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.BOOLEAN )); - columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING)); - columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER)); - columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN)); - columns.add(new ColumnDto("data", PcapTypes.STRING)); + + // Columns specific for Sessionized TCP Sessions + if (sessionizeTCPStreams) { + columns.add(new ColumnDto("session_start_time", PcapTypes.TIMESTAMP)); + columns.add(new ColumnDto("session_end_time", PcapTypes.TIMESTAMP)); + columns.add(new ColumnDto("session_duration", PcapTypes.DURATION)); + columns.add(new ColumnDto("total_packet_count", PcapTypes.INTEGER)); + columns.add(new ColumnDto("data_volume_from_origin", PcapTypes.INTEGER)); + columns.add(new ColumnDto("data_volume_from_remote", PcapTypes.INTEGER)); + columns.add(new ColumnDto("packet_count_from_origin", PcapTypes.INTEGER)); + columns.add(new ColumnDto("packet_count_from_remote", PcapTypes.INTEGER)); + + columns.add(new ColumnDto("connection_time", PcapTypes.DURATION)); + columns.add(new ColumnDto("tcp_session", PcapTypes.LONG)); + columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("data_from_originator", PcapTypes.STRING)); + columns.add(new ColumnDto("data_from_remote", PcapTypes.STRING)); + } else { + // Columns for Regular Packets + columns.add(new ColumnDto("type", PcapTypes.STRING)); + columns.add(new ColumnDto("network", PcapTypes.INTEGER)); + columns.add(new ColumnDto("packet_timestamp", PcapTypes.TIMESTAMP)); + columns.add(new ColumnDto("timestamp_micro", PcapTypes.LONG)); + columns.add(new ColumnDto("tcp_session", PcapTypes.LONG)); + columns.add(new ColumnDto("tcp_sequence", PcapTypes.INTEGER)); + columns.add(new ColumnDto("tcp_ack", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags", PcapTypes.INTEGER)); + columns.add(new ColumnDto("tcp_flags_ns", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_cwr", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_ece", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_ece_ecn_capable", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_ece_congestion_experienced", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_urg", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_ack", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_psh", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_rst", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_syn", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_flags_fin", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("tcp_parsed_flags", PcapTypes.STRING)); + columns.add(new ColumnDto("packet_length", PcapTypes.INTEGER)); + columns.add(new ColumnDto("is_corrupt", PcapTypes.BOOLEAN)); + columns.add(new ColumnDto("data", PcapTypes.STRING)); + } } /** diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java index f43afb7..7721dc7 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestFormatPluginOptionExtractor.java @@ -58,11 +58,13 @@ public class TestFormatPluginOptionExtractor { break; case "json": case "sequencefile": - case "pcap": case "pcapng": case "avro": assertEquals(d.typeName, "(type: String)", d.presentParams()); break; + case "pcap": + assertEquals(d.typeName, "(type: String, sessionizeTCPStreams: boolean)", d.presentParams()); + break; case "httpd": assertEquals("(type: String, logFormat: String, timestampFormat: String)", d.presentParams()); break; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java index f0e89d9..54021d2 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapDecoder.java @@ -22,6 +22,8 @@ import org.apache.drill.test.BaseTestQuery; import org.apache.drill.exec.store.pcap.decoder.Packet; import org.apache.drill.exec.store.pcap.decoder.PacketDecoder; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.BufferedInputStream; import java.io.DataOutputStream; @@ -35,7 +37,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class TestPcapDecoder extends BaseTestQuery { - private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPcapDecoder.class); + private static final Logger logger = LoggerFactory.getLogger(TestPcapDecoder.class); private static File bigFile; @@ -61,7 +63,7 @@ public class TestPcapDecoder extends BaseTestQuery { int offset = 0; - byte[] buffer = new byte[PcapRecordReader.BUFFER_SIZE + pd.getMaxLength()]; + byte[] buffer = new byte[PcapBatchReader.BUFFER_SIZE + pd.getMaxLength()]; int validBytes = in.read(buffer); assertTrue(validBytes > 50); @@ -168,7 +170,7 @@ public class TestPcapDecoder extends BaseTestQuery { PacketDecoder pd = new PacketDecoder(in); Packet p = pd.packet(); - byte[] buffer = new byte[PcapRecordReader.BUFFER_SIZE + pd.getMaxLength()]; + byte[] buffer = new byte[PcapBatchReader.BUFFER_SIZE + pd.getMaxLength()]; int validBytes = in.read(buffer); int offset = 0; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java index 246691e..5b07423 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestPcapEVFReader.java @@ -29,7 +29,6 @@ import org.apache.drill.test.ClusterTest; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; - import java.time.LocalDateTime; import java.time.Month; diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java new file mode 100644 index 0000000..8c2818d --- /dev/null +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/pcap/TestSessionizePCAP.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.drill.exec.store.pcap; + + +import org.apache.drill.test.ClusterFixture; +import org.apache.drill.test.ClusterTest; +import org.joda.time.Period; +import java.nio.file.Paths; +import java.time.LocalDateTime; +import org.junit.BeforeClass; +import org.junit.Test; +import java.time.format.DateTimeFormatter; + +import static org.junit.Assert.assertEquals; + +public class TestSessionizePCAP extends ClusterTest { + + private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss.SSS"); + + @BeforeClass + public static void setup() throws Exception { + ClusterTest.startCluster(ClusterFixture.builder(dirTestWatcher)); + + PcapFormatConfig sampleConfig = new PcapFormatConfig(); + sampleConfig.sessionizeTCPStreams = true; + + cluster.defineFormat("cp", "pcap", sampleConfig); + dirTestWatcher.copyResourceToRoot(Paths.get("store/pcap/")); + } + + @Test + public void testSessionizedStarQuery() throws Exception { + String sql = "SELECT * FROM cp.`/store/pcap/attack-trace.pcap` WHERE src_port=1821 AND dst_port=445"; + + testBuilder() + .sqlQuery(sql) + .ordered() + .baselineColumns("session_start_time", "session_end_time", "session_duration", "total_packet_count", "connection_time", "src_ip", "dst_ip", "src_port", "dst_port", + "src_mac_address", "dst_mac_address", "tcp_session", "is_corrupt", "data_from_originator", "data_from_remote", "data_volume_from_origin", + "data_volume_from_remote", "packet_count_from_origin", "packet_count_from_remote") + .baselineValues(LocalDateTime.parse("2009-04-20T03:28:28.374", formatter), + LocalDateTime.parse("2009-04-20T03:28:28.508", formatter), + Period.parse("PT0.134S"), 4, + Period.parse("PT0.119S"), + "98.114.205.102", + "192.150.11.111", + 1821, 445, + "00:08:E2:3B:56:01", + "00:30:48:62:4E:4A", + -8791568836279708938L, + false, + "........I....>...>..........Ib...<...<..........I....>...>", "", 62,0, 3, 1) + .go(); + } + + @Test + public void testSessionizedSpecificQuery() throws Exception { + String sql = "SELECT session_start_time, session_end_time,session_duration, total_packet_count, connection_time, src_ip, dst_ip, src_port, dst_port, src_mac_address, dst_mac_address, tcp_session, " + + "is_corrupt, data_from_originator, data_from_remote, data_volume_from_origin, data_volume_from_remote, packet_count_from_origin, packet_count_from_remote " + + "FROM cp.`/store/pcap/attack-trace.pcap` WHERE src_port=1821 AND dst_port=445"; + + testBuilder() + .sqlQuery(sql) + .ordered() + .baselineColumns("session_start_time", "session_end_time", "session_duration", "total_packet_count", "connection_time", "src_ip", "dst_ip", "src_port", "dst_port", + "src_mac_address", "dst_mac_address", "tcp_session", "is_corrupt", "data_from_originator", "data_from_remote", "data_volume_from_origin", + "data_volume_from_remote", "packet_count_from_origin", "packet_count_from_remote") + .baselineValues(LocalDateTime.parse("2009-04-20T03:28:28.374", formatter), + LocalDateTime.parse("2009-04-20T03:28:28.508", formatter), + Period.parse("PT0.134S"), 4, + Period.parse("PT0.119S"), + "98.114.205.102", + "192.150.11.111", + 1821, 445, + "00:08:E2:3B:56:01", + "00:30:48:62:4E:4A", + -8791568836279708938L, + false, + "........I....>...>..........Ib...<...<..........I....>...>", "", 62,0, 3, 1) + .go(); + } + + @Test + public void testSerDe() throws Exception { + String sql = "SELECT COUNT(*) FROM cp.`/store/pcap/attack-trace.pcap`"; + String plan = queryBuilder().sql(sql).explainJson(); + long cnt = queryBuilder().physical(plan).singletonLong(); + assertEquals("Counts should match", 5L, cnt); + } +} diff --git a/exec/java-exec/src/test/resources/store/pcap/attack-trace.pcap b/exec/java-exec/src/test/resources/store/pcap/attack-trace.pcap new file mode 100644 index 0000000..68e1fff Binary files /dev/null and b/exec/java-exec/src/test/resources/store/pcap/attack-trace.pcap differ
