This is an automated email from the ASF dual-hosted git repository. exceptionfactory pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 4b63d690631ebde9fdf0f40de59a7aff287a260e Author: JackHintonSmartDCSIT <[email protected]> AuthorDate: Tue Apr 23 11:12:26 2024 +0100 NIFI-13082 Added SplitPCAP Processor This closes #8691 Signed-off-by: David Handermann <[email protected]> --- .../nifi-network-processors/pom.xml | 11 + .../processors/network/pcap/ByteBufferReader.java | 59 +++++ .../apache/nifi/processors/network/pcap/PCAP.java | 127 ++++++++++ .../nifi/processors/network/pcap/PCAPHeader.java | 83 +++++++ .../nifi/processors/network/pcap/Packet.java | 145 +++++++++++ .../nifi/processors/network/pcap/SplitPCAP.java | 265 +++++++++++++++++++++ .../services/org.apache.nifi.processor.Processor | 3 +- .../nifi/processors/network/pcap/TestPCAP.java | 104 ++++++++ .../processors/network/pcap/TestSplitPCAP.java | 134 +++++++++++ 9 files changed, 930 insertions(+), 1 deletion(-) diff --git a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/pom.xml b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/pom.xml index cd03dcc5af..34a9e5e3ec 100644 --- a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/pom.xml +++ b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/pom.xml @@ -39,5 +39,16 @@ <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mock</artifactId> + <version>2.0.0-SNAPSHOT</version> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.junit.jupiter</groupId> + <artifactId>junit-jupiter-api</artifactId> + <scope>test</scope> + </dependency> </dependencies> </project> diff --git a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/ByteBufferReader.java b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/ByteBufferReader.java new file mode 100644 index 0000000000..bfa0db81da --- /dev/null +++ b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/ByteBufferReader.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.network.pcap; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +public class ByteBufferReader { + final private ByteBuffer buffer; + + public ByteBufferReader(byte[] byteArray) { + this.buffer = ByteBuffer.wrap(byteArray); + buffer.order(ByteOrder.LITTLE_ENDIAN); + } + + public int readU2() { + return (buffer.getShort() & 0xffff); + } + + public long readU4() { + return ((long) buffer.getInt() & 0xffffffffL); + } + + public int readS4() { + return buffer.getInt(); + } + + public byte[] readBytes(int n) { + byte[] output = new byte[n]; + buffer.get(output); + return output; + } + + public byte[] readBytes(long n) { + return readBytes((int) n); + } + + public int bytesLeft() { + return buffer.remaining(); + } + + public boolean hasRemaining() { + return buffer.hasRemaining(); + } +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAP.java b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAP.java new file mode 100644 index 0000000000..ec3862f5b5 --- /dev/null +++ b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAP.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.network.pcap; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +import java.util.ArrayList; +import java.util.List; + +/** + * PCAP (named after libpcap / winpcap) is a popular format for saving + * network traffic grabbed by network sniffers. It is typically + * produced by tools like [tcpdump](<a href="https://www.tcpdump.org/">...</a>) or + * [Wireshark](<a href="https://www.wireshark.org/">...</a>). + * + * @see <a href= + * "https://wiki.wireshark.org/Development/LibpcapFileFormat">Source</a> + */ +public class PCAP { + private final PCAPHeader hdr; + private final List<Packet> packets; + + public PCAP(ByteBufferReader io) { + this.hdr = new PCAPHeader(io); + this.packets = new ArrayList<>(); + while (io.hasRemaining()) { + this.packets.add(new Packet(io, this)); + } + } + + public PCAP(PCAPHeader hdr, List<Packet> packets) { + this.hdr = hdr; + this.packets = packets; + } + + public byte[] toByteArray() { + int headerBufferSize = PCAPHeader.PCAP_HEADER_LENGTH; + ByteBuffer headerBuffer = ByteBuffer.allocate(headerBufferSize); + headerBuffer.order(ByteOrder.LITTLE_ENDIAN); + + headerBuffer.put(this.hdr.magicNumber()); + headerBuffer.put(readIntToNBytes(this.hdr.versionMajor(), 2)); + headerBuffer.put(readIntToNBytes(this.hdr.versionMinor(), 2)); + headerBuffer.put(readIntToNBytes(this.hdr.thiszone(), 4)); + headerBuffer.put(readLongToNBytes(this.hdr.sigfigs(), 4, true)); + headerBuffer.put(readLongToNBytes(this.hdr.snaplen(), 4, true)); + headerBuffer.put(readLongToNBytes(this.hdr.network(), 4, true)); + + List<byte[]> packetByteArrays = new ArrayList<>(); + + int packetBufferSize = 0; + + for (Packet currentPacket : packets) { + int currentPacketTotalLength = Packet.PACKET_HEADER_LENGTH + currentPacket.rawBody().length; + + ByteBuffer currentPacketBytes = ByteBuffer.allocate(currentPacketTotalLength); + currentPacketBytes.put(readLongToNBytes(currentPacket.tsSec(), 4, false)); + currentPacketBytes.put(readLongToNBytes(currentPacket.tsUsec(), 4, false)); + currentPacketBytes.put(readLongToNBytes(currentPacket.inclLen(), 4, false)); + currentPacketBytes.put(readLongToNBytes(currentPacket.origLen(), 4, false)); + currentPacketBytes.put(currentPacket.rawBody()); + + packetByteArrays.add(currentPacketBytes.array()); + packetBufferSize += currentPacketTotalLength; + } + + ByteBuffer packetBuffer = ByteBuffer.allocate(packetBufferSize); + packetBuffer.order(ByteOrder.LITTLE_ENDIAN); + + for (byte[] packetByteArray : packetByteArrays) { + packetBuffer.put(packetByteArray); + } + + ByteBuffer allBytes = ByteBuffer.allocate(headerBufferSize + packetBufferSize); + allBytes.order(ByteOrder.LITTLE_ENDIAN); + + allBytes.put(headerBuffer.array()); + allBytes.put(packetBuffer.array()); + + return allBytes.array(); + } + + protected static byte[] readIntToNBytes(int input, int numberOfBytes) { + byte[] output = new byte[numberOfBytes]; + output[0] = (byte) (input & 0xff); + for (int loop = 1; loop < numberOfBytes; loop++) { + output[loop] = (byte) (input >>> (8 * loop)); + } + return output; + } + + private byte[] readLongToNBytes(long input, int numberOfBytes, boolean isSigned) { + byte[] output = new byte[numberOfBytes]; + output[0] = (byte) (input & 0xff); + for (int loop = 1; loop < numberOfBytes; loop++) { + if (isSigned) { + output[loop] = (byte) (input >> (8 * loop)); + } else { + output[loop] = (byte) (input >>> (8 * loop)); + } + } + return output; + } + + public PCAPHeader getHeader() { + return hdr; + } + + public List<Packet> getPackets() { + return packets; + } +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAPHeader.java b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAPHeader.java new file mode 100644 index 0000000000..6dd5bff330 --- /dev/null +++ b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAPHeader.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.network.pcap; + +public class PCAPHeader { + static final int PCAP_HEADER_LENGTH = 24; + private final byte[] magicNumber; + private final int versionMajor; + private final int versionMinor; + private final int thiszone; + private final long sigfigs; + private final long snaplen; + private final long network; + + public PCAPHeader(ByteBufferReader io) { + this.magicNumber = io.readBytes(4); + this.versionMajor = io.readU2(); + this.versionMinor = io.readU2(); + this.thiszone = io.readS4(); + this.sigfigs = io.readU4(); + this.snaplen = io.readU4(); + this.network = io.readU4(); + } + + public byte[] magicNumber() { + return magicNumber; + } + + public int versionMajor() { + return versionMajor; + } + + public int versionMinor() { + return versionMinor; + } + + /** + * Correction time in seconds between UTC and the local + * timezone of the following packet header timestamps. + */ + public int thiszone() { + return thiszone; + } + + /** + * In theory, the accuracy of time stamps in the capture; in + * practice, all tools set it to 0. + */ + public long sigfigs() { + return sigfigs; + } + + /** + * The "snapshot length" for the capture (typically 65535 or + * even more, but might be limited by the user), see: incl_len + * vs. orig_len. + */ + public long snaplen() { + return snaplen; + } + + /** + * Link-layer header type, specifying the type of headers at + * the beginning of the packet. + */ + public long network() { + return network; + } +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Packet.java b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Packet.java new file mode 100644 index 0000000000..086f6675e4 --- /dev/null +++ b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Packet.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.network.pcap; + +public class Packet { + static final int PACKET_HEADER_LENGTH = 16; + + private ByteBufferReader io; + private long tsSec; + private long tsUsec; + private long inclLen; + private long origLen; + private long expectedLength; + private int totalLength; + private PCAP root; + private byte[] rawBody; + private String invalidityReason; + + public Packet(ByteBufferReader io, PCAP root) { + this.root = root; + this.io = io; + read(); + } + + public Packet(byte[] headerArray, PCAP root) { + this.root = root; + this.io = new ByteBufferReader(headerArray); + read(); + } + + public Packet(long tSSec, long tSUsec, long inclLen, long origLen, byte[] rawBody) { + // packet header properties + this.tsSec = tSSec; + this.tsUsec = tSUsec; + this.inclLen = inclLen; + this.origLen = origLen; + + // packet calculated properties + this.expectedLength = inclLen; + this.totalLength = PACKET_HEADER_LENGTH + rawBody.length; + this.rawBody = rawBody; + this.setValidity(); + } + + private void read() { + this.tsSec = this.io.readU4(); + this.tsUsec = this.io.readU4(); + this.inclLen = this.io.readU4(); + this.origLen = this.io.readU4(); + + this.expectedLength = Math.min(inclLen(), root().getHeader().snaplen()); + + if (this.io.bytesLeft() >= expectedLength) { + this.rawBody = this.io.readBytes(expectedLength); + } else { + this.rawBody = new byte[0]; + } + this.setValidity(); + this.totalLength = PACKET_HEADER_LENGTH + this.rawBody.length; + } + + private void setValidity() { + this.invalidityReason = null; + if (this.rawBody.length == 0) { + this.invalidityReason = "Packet body is empty"; + } else if (this.inclLen > this.origLen) { + this.invalidityReason = "The reported length of this packet exceeds the reported length of the original packet " + + "as sent on the network; (" + this.inclLen + " > " + this.origLen + ")"; + } else if (this.origLen == 0) { + this.invalidityReason = "The reported original length of this packet as send on the network is 0."; + } else if (this.inclLen == 0) { + this.invalidityReason = "The reported length of this packet is 0."; + } + } + + public boolean isInvalid() { + return this.invalidityReason != null; + } + + public long tsSec() { + return tsSec; + } + + public long tsUsec() { + return tsUsec; + } + + /** + * Number of bytes of packet data actually captured and saved in the file. + */ + public long inclLen() { + return inclLen; + } + + /** + * Length of the packet as it appeared on the network when it was captured. + */ + public long origLen() { + return origLen; + } + + /** + * @see <a href= + * "https://wiki.wireshark.org/Development/LibpcapFileFormat#Packet_Data">Source</a> + */ + public PCAP root() { + return root; + } + + public byte[] rawBody() { + return rawBody; + } + + public long expectedLength() { + return expectedLength; + } + + public int totalLength() { + return totalLength; + } + + public String invalidityReason() { + return invalidityReason; + } + + public void setBody(byte[] newBody) { + this.rawBody = newBody; + this.setValidity(); + this.totalLength = PACKET_HEADER_LENGTH + rawBody.length; + } +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java new file mode 100644 index 0000000000..7074f55223 --- /dev/null +++ b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java @@ -0,0 +1,265 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.network.pcap; + +import org.apache.nifi.components.PropertyDescriptor; +import org.apache.nifi.flowfile.FlowFile; +import org.apache.nifi.annotation.behavior.InputRequirement; +import org.apache.nifi.annotation.behavior.InputRequirement.Requirement; +import org.apache.nifi.annotation.behavior.SideEffectFree; +import org.apache.nifi.annotation.behavior.WritesAttribute; +import org.apache.nifi.annotation.behavior.WritesAttributes; +import org.apache.nifi.annotation.documentation.CapabilityDescription; +import org.apache.nifi.annotation.documentation.Tags; +import org.apache.nifi.processor.AbstractProcessor; +import org.apache.nifi.processor.DataUnit; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.flowfile.attributes.FragmentAttributes; +import org.apache.nifi.flowfile.attributes.CoreAttributes; +import org.apache.nifi.processor.io.InputStreamCallback; +import org.apache.nifi.stream.io.StreamUtils; + +import java.io.BufferedInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.IntStream; + +@SideEffectFree +@InputRequirement(Requirement.INPUT_REQUIRED) +@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark", "TShark", "TcpDump", "WinDump", "sniffers"}) +@CapabilityDescription("Splits one pcap file into multiple pcap files based on a maximum size.") +@WritesAttributes( + { + @WritesAttribute( + attribute = SplitPCAP.ERROR_REASON_LABEL, + description = "The reason the FlowFile was sent to the failure relationship." + ), + @WritesAttribute( + attribute = "fragment.identifier", + description = "All split PCAP FlowFiles produced from the same parent PCAP FlowFile will have the same randomly generated UUID added for this attribute" + ), + @WritesAttribute( + attribute = "fragment.index", + description = "A one-up number that indicates the ordering of the split PCAP FlowFiles that were created from a single parent PCAP FlowFile" + ), + @WritesAttribute( + attribute = "fragment.count", + description = "The number of split PCAP FlowFiles generated from the parent PCAP FlowFile" + ), + @WritesAttribute( + attribute = "segment.original.filename", + description = "The filename of the parent PCAP FlowFile" + ) + } +) +public class SplitPCAP extends AbstractProcessor { + + protected static final String ERROR_REASON_LABEL = "error.reason"; + public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key(); + public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key(); + public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key(); + public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key(); + + public static final PropertyDescriptor PCAP_MAX_SIZE = new PropertyDescriptor + .Builder().name("PCAP Max Size") + .displayName("PCAP Max Size") + .description("Maximum size of each output PCAP file. PCAP packets larger than the configured size result in routing FlowFiles to the failure relationship.") + .required(true) + .defaultValue("1 MB") + .addValidator(StandardValidators.DATA_SIZE_VALIDATOR) + .build(); + + public static final Relationship REL_ORIGINAL = new Relationship.Builder() + .name("original") + .description("The original FlowFile that was split into segments. If the FlowFile fails processing, nothing will be sent to " + + "this relationship") + .build(); + + public static final Relationship REL_FAILURE = new Relationship.Builder() + .name("failure") + .description("If a FlowFile cannot be transformed from the configured input format to the configured output format, " + + "the unchanged FlowFile will be routed to this relationship.") + .build(); + + public static final Relationship REL_SPLIT = new Relationship.Builder() + .name("split") + .description("The individual PCAP 'segments' of the original PCAP FlowFile will be routed to this relationship.") + .build(); + + private static final List<PropertyDescriptor> DESCRIPTORS = List.of(PCAP_MAX_SIZE); + + private static final Set<Relationship> RELATIONSHIPS = Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT); + + @Override + public Set<Relationship> getRelationships() { + return RELATIONSHIPS; + } + + @Override + public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { + return DESCRIPTORS; + } + + /** + * This method is called when a trigger event occurs in the processor. + * It processes the incoming flow file, splits it into smaller pcap files based on the PCAP Max Size, + * and transfers the split pcap files to the success relationship. + * If the flow file is empty or not parseable, it is transferred to the failure relationship. + * + * @param context the process context + * @param session the process session + */ + @Override + public void onTrigger(final ProcessContext context, final ProcessSession session) { + + FlowFile originalFlowFile = session.get(); + if (originalFlowFile == null) { + return; + } + final int pcapMaxSize = context.getProperty(PCAP_MAX_SIZE.getName()).asDataSize(DataUnit.B).intValue(); + final PCAPStreamSplitterCallback callback = new PCAPStreamSplitterCallback(session, originalFlowFile, pcapMaxSize); + + try { + session.read(originalFlowFile, callback); + } catch (ProcessException e) { + getLogger().error("Failed to split {}", originalFlowFile, e); + session.remove(callback.getSplitFiles()); + session.putAttribute(originalFlowFile, ERROR_REASON_LABEL, e.getMessage()); + session.transfer(originalFlowFile, REL_FAILURE); + return; + } + + final String fragmentId = UUID.randomUUID().toString(); + final String originalFileName = originalFlowFile.getAttribute(CoreAttributes.FILENAME.key()); + final String originalFileNameWithoutExtension = originalFileName.substring(0, originalFileName.lastIndexOf(".")); + + final List<FlowFile> splitFiles = callback.getSplitFiles(); + final Map<String, String> attributes = new HashMap<>(); + attributes.put(FRAGMENT_COUNT, String.valueOf(splitFiles.size())); + attributes.put(FRAGMENT_ID, fragmentId); + attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName); + + IntStream.range(0, splitFiles.size()).forEach(index -> { + FlowFile split = splitFiles.get(index); + attributes.put(CoreAttributes.FILENAME.key(), "%s-%d.pcap".formatted(originalFileNameWithoutExtension, index)); + attributes.put(FRAGMENT_INDEX, Integer.toString(index)); + session.transfer(session.putAllAttributes(split, attributes), REL_SPLIT); + }); + session.transfer(originalFlowFile, REL_ORIGINAL); + } + + protected static class PCAPStreamSplitterCallback implements InputStreamCallback { + private final ProcessSession session; + private final FlowFile originalFlowFile; + private final int pcapMaxSize; + private final List<FlowFile> splitFiles = new ArrayList<>(); + + public List<FlowFile> getSplitFiles() { + return splitFiles; + } + + public PCAPStreamSplitterCallback(ProcessSession session, FlowFile flowFile, int pcapMaxSize) { + this.session = session; + this.originalFlowFile = flowFile; + this.pcapMaxSize = pcapMaxSize; + } + + private Packet getNextPacket(final BufferedInputStream bufferedStream, final PCAP templatePcap, final int totalPackets) throws IOException { + final byte[] packetHeader = new byte[Packet.PACKET_HEADER_LENGTH]; + StreamUtils.read(bufferedStream, packetHeader, Packet.PACKET_HEADER_LENGTH); + + final Packet currentPacket = new Packet(packetHeader, templatePcap); + + if (currentPacket.totalLength() > this.pcapMaxSize) { + throw new ProcessException("PCAP Packet length [%d] larger then configured maximum [%d]".formatted(currentPacket.totalLength(), pcapMaxSize)); + } + + final int expectedLength = (int) currentPacket.expectedLength(); + final byte[] packetBody = new byte[expectedLength]; + StreamUtils.read(bufferedStream, packetBody, expectedLength); + currentPacket.setBody(packetBody); + + if (currentPacket.isInvalid()) { + throw new ProcessException("PCAP contains an invalid packet. Packet number [%d] is invalid - [%s]".formatted(totalPackets, currentPacket.invalidityReason())); + } + + return currentPacket; + } + + @Override + public void process(final InputStream inStream) throws IOException { + final List<Packet> loadedPackets = new ArrayList<>(); + final BufferedInputStream bufferedStream = new BufferedInputStream(inStream); + int totalPackets = 1; + + if (bufferedStream.available() == 0) { + throw new ProcessException("Input PCAP file empty"); + } + + final byte[] pcapHeader = new byte[PCAPHeader.PCAP_HEADER_LENGTH]; + StreamUtils.read(bufferedStream, pcapHeader, PCAPHeader.PCAP_HEADER_LENGTH); + + int currentPcapTotalLength = PCAPHeader.PCAP_HEADER_LENGTH; + + final PCAP templatePcap = new PCAP(new ByteBufferReader(pcapHeader)); + + while (bufferedStream.available() > 0) { + + Packet currentPacket = getNextPacket(bufferedStream, templatePcap, totalPackets); + + if (currentPcapTotalLength + currentPacket.totalLength() > this.pcapMaxSize) { + + templatePcap.getPackets().addAll(loadedPackets); + FlowFile newFlowFile = session.create(originalFlowFile); + try (final OutputStream out = session.write(newFlowFile)) { + out.write(templatePcap.toByteArray()); + this.splitFiles.add(newFlowFile); + } + + loadedPackets.clear(); + currentPcapTotalLength = PCAPHeader.PCAP_HEADER_LENGTH; + templatePcap.getPackets().clear(); + } + + loadedPackets.add(currentPacket); + totalPackets++; + currentPcapTotalLength += currentPacket.totalLength(); + } + + // If there are any packets left over, create a new flowfile. + if (!loadedPackets.isEmpty()) { + templatePcap.getPackets().addAll(loadedPackets); + FlowFile newFlowFile = session.create(originalFlowFile); + try (final OutputStream out = session.write(newFlowFile)) { + out.write(templatePcap.toByteArray()); + this.splitFiles.add(newFlowFile); + } + } + } + } +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor index a5dfac9f70..c4496962a6 100644 --- a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor +++ b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor @@ -12,4 +12,5 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -org.apache.nifi.processors.network.ParseNetflowv5 \ No newline at end of file +org.apache.nifi.processors.network.ParseNetflowv5 +org.apache.nifi.processors.network.pcap.SplitPCAP \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestPCAP.java b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestPCAP.java new file mode 100644 index 0000000000..a619824cd8 --- /dev/null +++ b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestPCAP.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.network.pcap; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestPCAP { + private static final byte[] PACKET_DATA = new byte[]{ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + }; + + private static final int[][] PCAP_HEADER_VALUES = new int[][]{ + new int[]{2, 2}, + new int[]{4, 2}, + new int[]{0, 4}, + new int[]{0, 4}, + new int[]{40, 4}, + new int[]{1, 4} + }; + + private static final Map<String, Long> packetHeaderValues = Map.of( + "tsSec", 1713184965L, + "tsUsec", 1000L, + "inclLen", 30L, + "origLen", 30L + ); + + @Test + void testReadBytesFull() { + + // Create a header for the test PCAP + ByteBuffer headerBuffer = ByteBuffer.allocate(PCAPHeader.PCAP_HEADER_LENGTH); + headerBuffer.put(new byte[]{(byte) 0xa1, (byte) 0xb2, (byte) 0xc3, (byte) 0xd4}); + for (int[] value : PCAP_HEADER_VALUES) { + headerBuffer.put(PCAP.readIntToNBytes(value[0], value[1])); + } + PCAPHeader hdr = new PCAPHeader(new ByteBufferReader(headerBuffer.array())); + // Create a sample packet + List<Packet> packets = new ArrayList<>(); + packets.add(new Packet( + packetHeaderValues.get("tsSec"), + packetHeaderValues.get("tsUsec"), + packetHeaderValues.get("inclLen"), + packetHeaderValues.get("origLen"), + PACKET_DATA + )); + + // create test PCAP + PCAP testPcap = new PCAP(hdr, packets); + + // Call the readBytesFull method + byte[] result = testPcap.toByteArray(); + + // Assert the expected byte array length + assertEquals(70, result.length); + + // Assert the expected byte array values + ByteBuffer buffer = ByteBuffer.wrap(result); + assertEquals(0xa1b2c3d4, buffer.getInt()); + ByteBuffer litteEndianBuffer = ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN); + litteEndianBuffer.position(4); + + for (int[] value : PCAP_HEADER_VALUES) { + if (value[1] == 2) { + assertEquals(value[0], litteEndianBuffer.getShort()); + } else { + assertEquals(value[0], litteEndianBuffer.getInt()); + } + } + + assertEquals(packetHeaderValues.get("tsSec"), litteEndianBuffer.getInt()); + assertEquals(packetHeaderValues.get("tsUsec"), litteEndianBuffer.getInt()); + assertEquals(packetHeaderValues.get("inclLen"), litteEndianBuffer.getInt()); + assertEquals(packetHeaderValues.get("origLen"), litteEndianBuffer.getInt()); + byte[] body = new byte[30]; + litteEndianBuffer.get(40, body, 0, 30); + assertArrayEquals(body, PACKET_DATA); + } +} \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestSplitPCAP.java b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestSplitPCAP.java new file mode 100644 index 0000000000..bcc5ef7cba --- /dev/null +++ b/nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestSplitPCAP.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.network.pcap; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Collections; +import java.util.Map; + +import org.apache.nifi.processor.Relationship; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class TestSplitPCAP { + + private PCAPHeader pcapHeader; + private Packet validPacket; + private Packet invalidPacket; + + protected static final long PACKET_TIMESTAMP = 1713184965; + protected static final long PACKET_INTERVAL = 1000; + private static final byte[] PACKET_DATA = new byte[]{ + 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, + 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, + }; + + @BeforeEach + void init() { + + ByteBuffer headerBuffer = ByteBuffer.allocate(PCAPHeader.PCAP_HEADER_LENGTH); + headerBuffer.put(new byte[]{(byte) 0xa1, (byte) 0xb2, (byte) 0xc3, (byte) 0xd4}); + headerBuffer.put(PCAP.readIntToNBytes(2, 2)); + headerBuffer.put(PCAP.readIntToNBytes(4, 2)); + headerBuffer.put(PCAP.readIntToNBytes(0, 4)); + headerBuffer.put(PCAP.readIntToNBytes(0, 4)); + headerBuffer.put(PCAP.readIntToNBytes(4000, 4)); + headerBuffer.put(PCAP.readIntToNBytes(1, 4)); + this.pcapHeader = new PCAPHeader(new ByteBufferReader(headerBuffer.array())); + + this.validPacket = new Packet( + PACKET_TIMESTAMP, + PACKET_INTERVAL, + 30, + 30, + PACKET_DATA + ); + + this.invalidPacket = new Packet( + PACKET_TIMESTAMP, + PACKET_INTERVAL, + 50, + 10, + PACKET_DATA + ); + + } + + void executeTest(String pcapMaxSize, List<Packet> packets, Map<Relationship, Integer> expectedRelations) { + TestRunner runner = TestRunners.newTestRunner(SplitPCAP.class); + runner.setProperty(SplitPCAP.PCAP_MAX_SIZE, pcapMaxSize); + + PCAP testPcap = new PCAP(this.pcapHeader, packets); + + runner.enqueue(testPcap.toByteArray()); + + runner.run(); + + for (Map.Entry<Relationship, Integer> entry : expectedRelations.entrySet()) { + runner.assertTransferCount(entry.getKey(), entry.getValue()); + } + + runner.assertQueueEmpty(); + } + + @Test + void testSuccesses() { + executeTest( + "100B", + Collections.nCopies(3, this.validPacket), + Map.of( + SplitPCAP.REL_SPLIT, 3, + SplitPCAP.REL_ORIGINAL, 1 + ) + ); + executeTest( + "50B", + Collections.nCopies(3, this.validPacket), + Map.of( + SplitPCAP.REL_SPLIT, 4, + SplitPCAP.REL_ORIGINAL, 1 + ) + ); + } + + @Test + void testFailures() { + executeTest( + "50B", + Collections.nCopies(3, this.invalidPacket), + Map.of(SplitPCAP.REL_FAILURE, 1) + ); + executeTest( + "10B", + Collections.nCopies(3, this.validPacket), + Map.of(SplitPCAP.REL_FAILURE, 1) + ); + + List<Packet> mixedValidityPackets = new ArrayList<>(Collections.nCopies(3, this.validPacket)); + mixedValidityPackets.add(this.invalidPacket); + executeTest( + "50B", + mixedValidityPackets, + Map.of(SplitPCAP.REL_FAILURE, 1) + ); + } +} \ No newline at end of file
