exceptionfactory commented on code in PR #8691:
URL: https://github.com/apache/nifi/pull/8691#discussion_r1641603393
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Header.java:
##########
@@ -0,0 +1,130 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class Header {
+ private static final Logger logger = LoggerFactory.getLogger(Header.class);
+ private ByteBufferInterface io;
+ private byte[] magicNumber;
+ private int versionMajor;
+ private int versionMinor;
+ private int thiszone;
+ private long sigfigs;
+ private long snaplen;
+ private long network;
+
+ public Header(ByteBufferInterface io) {
+ this.io = io;
+ try {
+ read();
+ } catch (IllegalArgumentException e) {
+ logger.error("PCAP file header could not be parsed", e);
Review Comment:
Logging an error here does not impact behavior. As this is only catching an
`IllegalArgumentException`, recommend removing the try-catch block so that the
exceptions gets passed to the caller and stops further processing.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAP.java:
##########
@@ -0,0 +1,145 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * PCAP (named after libpcap / winpcap) is a popular format for saving
+ * network traffic grabbed by network sniffers. It is typically
+ * produced by tools like [tcpdump](<a
href="https://www.tcpdump.org/">...</a>) or
+ * [Wireshark](<a href="https://www.wireshark.org/">...</a>).
+ *
+ * @see <a href=
+ * "https://wiki.wireshark.org/Development/LibpcapFileFormat">Source</a>
+ */
+public class PCAP {
+ static final int PCAP_HEADER_LENGTH = 24;
+ private ByteBufferInterface io;
+ private Header hdr;
+ private List<Packet> packets;
+
+ public PCAP(ByteBufferInterface io) {
+ this.io = io;
+ read();
+ }
+
+ public PCAP(Header hdr, List<Packet> packets) {
+ this.hdr = hdr;
+ this.packets = packets;
+ }
+
+ public byte[] readBytesFull() {
+
+ int headerBufferSize = 20 + this.hdr().magicNumber().length;
+ ByteBuffer headerBuffer = ByteBuffer.allocate(headerBufferSize);
+ headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ headerBuffer.put(this.hdr().magicNumber());
+ headerBuffer.put(this.readIntToNBytes(this.hdr().versionMajor(), 2,
false));
+ headerBuffer.put(this.readIntToNBytes(this.hdr().versionMinor(), 2,
false));
+ headerBuffer.put(this.readIntToNBytes(this.hdr().thiszone(), 4,
false));
+ headerBuffer.put(this.readLongToNBytes(this.hdr().sigfigs(), 4, true));
+ headerBuffer.put(this.readLongToNBytes(this.hdr().snaplen(), 4, true));
+ headerBuffer.put(this.readLongToNBytes(this.hdr().network(), 4, true));
+
+ List<byte[]> packetByteArrays = new ArrayList<>();
+
+ int packetBufferSize = 0;
+
+ for (Packet currentPacket : packets) {
+ int currentPacketTotalLength = Packet.PACKET_HEADER_LENGTH +
currentPacket.rawBody().length;
+
+ ByteBuffer currentPacketBytes =
ByteBuffer.allocate(currentPacketTotalLength);
+ currentPacketBytes.put(readLongToNBytes(currentPacket.tsSec(), 4,
false));
+ currentPacketBytes.put(readLongToNBytes(currentPacket.tsUsec(), 4,
false));
+ currentPacketBytes.put(readLongToNBytes(currentPacket.inclLen(),
4, false));
+ currentPacketBytes.put(readLongToNBytes(currentPacket.origLen(),
4, false));
+ currentPacketBytes.put(currentPacket.rawBody());
+
+ packetByteArrays.add(currentPacketBytes.array());
+ packetBufferSize += currentPacketTotalLength;
+ }
+
+ ByteBuffer packetBuffer = ByteBuffer.allocate(packetBufferSize);
+ packetBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ for (byte[] packetByteArray : packetByteArrays) {
+ packetBuffer.put(packetByteArray);
+ }
+
+ ByteBuffer allBytes = ByteBuffer.allocate(headerBufferSize +
packetBufferSize);
+ allBytes.order(ByteOrder.LITTLE_ENDIAN);
+
+ allBytes.put(headerBuffer.array());
+ allBytes.put(packetBuffer.array());
+
+ return allBytes.array();
+ }
+
+ private byte[] readIntToNBytes(int input, int numberOfBytes, boolean
isSigned) {
+ byte[] output = new byte[numberOfBytes];
+ output[0] = (byte) (input & 0xff);
+ for (int loop = 1; loop < numberOfBytes; loop++) {
+ if (isSigned) {
+ output[loop] = (byte) (input >> (8 * loop));
+ } else {
+ output[loop] = (byte) (input >>> (8 * loop));
+ }
+ }
+ return output;
+ }
+
+ private byte[] readLongToNBytes(long input, int numberOfBytes, boolean
isSigned) {
+ byte[] output = new byte[numberOfBytes];
+ output[0] = (byte) (input & 0xff);
+ for (int loop = 1; loop < numberOfBytes; loop++) {
+ if (isSigned) {
+ output[loop] = (byte) (input >> (8 * loop));
+ } else {
+ output[loop] = (byte) (input >>> (8 * loop));
+ }
+ }
+ return output;
+ }
+
+ private void read() {
+ this.hdr = new Header(this.io);
+ this.packets = new ArrayList<>();
+ while (!this.io.isEof()) {
+ this.packets.add(new Packet(this.io, this));
+ }
+ }
+
+ public Header hdr() {
Review Comment:
Recommend renaming this to `getHeader()`
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestPCAP.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestPCAP {
+ @Test
+ public void testReadBytesFull() {
+
+ // Create a header for the test PCAP
+ Header hdr = new Header(
+ new byte[]{(byte) 0xa1, (byte) 0xb2, (byte) 0xc3, (byte) 0xd4},
+ 2,
+ 4,
+ 0,
+ (long) 0,
+ (long) 40,
+ (long) 1 // ETHERNET
+ );
+
+ // Create a sample packet
+ List<Packet> packets = new ArrayList<>();
+ packets.add(new Packet(
+ (long) 1713184965,
+ (long) 1000,
+ (long) 30,
+ (long) 30,
+ new byte[]{
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
+ 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
+ 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
+ }));
+
+ // create test PCAP
+
+ PCAP testPcap = new PCAP(hdr, packets);
+
+ // Call the readBytesFull method
+ byte[] result = testPcap.readBytesFull();
+
+ // Assert the expected byte array length
+ assertEquals(70, result.length);
+
+ // Assert the expected byte array values
+ ByteBuffer buffer = ByteBuffer.wrap(result);
+ assertEquals(0xa1b2c3d4, buffer.getInt());
+ ByteBuffer LEBuffer =
ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
Review Comment:
```suggestion
ByteBuffer litteEndianBuffer =
ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
```
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Header.java:
##########
@@ -0,0 +1,130 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class Header {
+ private static final Logger logger = LoggerFactory.getLogger(Header.class);
+ private ByteBufferInterface io;
+ private byte[] magicNumber;
+ private int versionMajor;
+ private int versionMinor;
+ private int thiszone;
+ private long sigfigs;
+ private long snaplen;
+ private long network;
+
+ public Header(ByteBufferInterface io) {
+ this.io = io;
+ try {
+ read();
+ } catch (IllegalArgumentException e) {
+ logger.error("PCAP file header could not be parsed", e);
+ }
+ }
+
+ public Header(byte[] magicNumber, int versionMajor, int versionMinor, int
thiszone, long sigfigs, long snaplen,
Review Comment:
Since this version of the constructor is only used in test classes, I
recommend removing it and only having the the single constructor. That should
also allow the member variables to be declared `final`.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAP.java:
##########
@@ -0,0 +1,145 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * PCAP (named after libpcap / winpcap) is a popular format for saving
+ * network traffic grabbed by network sniffers. It is typically
+ * produced by tools like [tcpdump](<a
href="https://www.tcpdump.org/">...</a>) or
+ * [Wireshark](<a href="https://www.wireshark.org/">...</a>).
+ *
+ * @see <a href=
+ * "https://wiki.wireshark.org/Development/LibpcapFileFormat">Source</a>
+ */
+public class PCAP {
+ static final int PCAP_HEADER_LENGTH = 24;
+ private ByteBufferInterface io;
+ private Header hdr;
+ private List<Packet> packets;
+
+ public PCAP(ByteBufferInterface io) {
+ this.io = io;
+ read();
+ }
+
+ public PCAP(Header hdr, List<Packet> packets) {
+ this.hdr = hdr;
+ this.packets = packets;
+ }
+
+ public byte[] readBytesFull() {
+
+ int headerBufferSize = 20 + this.hdr().magicNumber().length;
+ ByteBuffer headerBuffer = ByteBuffer.allocate(headerBufferSize);
+ headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ headerBuffer.put(this.hdr().magicNumber());
+ headerBuffer.put(this.readIntToNBytes(this.hdr().versionMajor(), 2,
false));
+ headerBuffer.put(this.readIntToNBytes(this.hdr().versionMinor(), 2,
false));
+ headerBuffer.put(this.readIntToNBytes(this.hdr().thiszone(), 4,
false));
+ headerBuffer.put(this.readLongToNBytes(this.hdr().sigfigs(), 4, true));
+ headerBuffer.put(this.readLongToNBytes(this.hdr().snaplen(), 4, true));
+ headerBuffer.put(this.readLongToNBytes(this.hdr().network(), 4, true));
+
+ List<byte[]> packetByteArrays = new ArrayList<>();
+
+ int packetBufferSize = 0;
+
+ for (Packet currentPacket : packets) {
+ int currentPacketTotalLength = Packet.PACKET_HEADER_LENGTH +
currentPacket.rawBody().length;
+
+ ByteBuffer currentPacketBytes =
ByteBuffer.allocate(currentPacketTotalLength);
+ currentPacketBytes.put(readLongToNBytes(currentPacket.tsSec(), 4,
false));
+ currentPacketBytes.put(readLongToNBytes(currentPacket.tsUsec(), 4,
false));
+ currentPacketBytes.put(readLongToNBytes(currentPacket.inclLen(),
4, false));
+ currentPacketBytes.put(readLongToNBytes(currentPacket.origLen(),
4, false));
+ currentPacketBytes.put(currentPacket.rawBody());
+
+ packetByteArrays.add(currentPacketBytes.array());
+ packetBufferSize += currentPacketTotalLength;
+ }
+
+ ByteBuffer packetBuffer = ByteBuffer.allocate(packetBufferSize);
+ packetBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+ for (byte[] packetByteArray : packetByteArrays) {
+ packetBuffer.put(packetByteArray);
+ }
+
+ ByteBuffer allBytes = ByteBuffer.allocate(headerBufferSize +
packetBufferSize);
+ allBytes.order(ByteOrder.LITTLE_ENDIAN);
+
+ allBytes.put(headerBuffer.array());
+ allBytes.put(packetBuffer.array());
+
+ return allBytes.array();
+ }
+
+ private byte[] readIntToNBytes(int input, int numberOfBytes, boolean
isSigned) {
+ byte[] output = new byte[numberOfBytes];
+ output[0] = (byte) (input & 0xff);
+ for (int loop = 1; loop < numberOfBytes; loop++) {
+ if (isSigned) {
+ output[loop] = (byte) (input >> (8 * loop));
+ } else {
+ output[loop] = (byte) (input >>> (8 * loop));
+ }
+ }
+ return output;
+ }
+
+ private byte[] readLongToNBytes(long input, int numberOfBytes, boolean
isSigned) {
+ byte[] output = new byte[numberOfBytes];
+ output[0] = (byte) (input & 0xff);
+ for (int loop = 1; loop < numberOfBytes; loop++) {
+ if (isSigned) {
+ output[loop] = (byte) (input >> (8 * loop));
+ } else {
+ output[loop] = (byte) (input >>> (8 * loop));
+ }
+ }
+ return output;
+ }
+
+ private void read() {
+ this.hdr = new Header(this.io);
+ this.packets = new ArrayList<>();
+ while (!this.io.isEof()) {
+ this.packets.add(new Packet(this.io, this));
+ }
+ }
+
+ public Header hdr() {
+ return hdr;
+ }
+
+ public List<Packet> packets() {
Review Comment:
Recommend renaming to `getPackets()`
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/ByteBufferInterface.java:
##########
@@ -0,0 +1,84 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+public class ByteBufferInterface {
Review Comment:
Recommend renaming this to `ByteBufferReader` or `ByteBufferDecoder` since
`Interface` generally has a different connotation.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Header.java:
##########
@@ -0,0 +1,130 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class Header {
+ private static final Logger logger = LoggerFactory.getLogger(Header.class);
+ private ByteBufferInterface io;
+ private byte[] magicNumber;
+ private int versionMajor;
+ private int versionMinor;
+ private int thiszone;
+ private long sigfigs;
+ private long snaplen;
+ private long network;
+
+ public Header(ByteBufferInterface io) {
+ this.io = io;
+ try {
+ read();
+ } catch (IllegalArgumentException e) {
+ logger.error("PCAP file header could not be parsed", e);
+ }
+ }
+
+ public Header(byte[] magicNumber, int versionMajor, int versionMinor, int
thiszone, long sigfigs, long snaplen,
+ long network) {
+
+ this.magicNumber = magicNumber;
+ this.versionMajor = versionMajor;
+ this.versionMinor = versionMinor;
+ this.thiszone = thiszone;
+ this.sigfigs = sigfigs;
+ this.snaplen = snaplen;
+ this.network = network;
+ }
+
+ public ByteBufferInterface io() {
+ return io;
+ }
+
+ private void read() {
+ this.magicNumber = this.io.readBytes(4);
+ if (Arrays.equals(this.magicNumber, new byte[]{(byte) 0xd4, (byte)
0xc3, (byte) 0xb2, (byte) 0xa1})) {
Review Comment:
The byte array used for comparison should be declared as a static final
variable instead of being constructed on each invocation of this method.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Header.java:
##########
@@ -0,0 +1,130 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+
+public class Header {
Review Comment:
Recommend renaming this to `PCAPHeader` for a clearer naming association.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/PCAP.java:
##########
@@ -0,0 +1,145 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * PCAP (named after libpcap / winpcap) is a popular format for saving
+ * network traffic grabbed by network sniffers. It is typically
+ * produced by tools like [tcpdump](<a
href="https://www.tcpdump.org/">...</a>) or
+ * [Wireshark](<a href="https://www.wireshark.org/">...</a>).
+ *
+ * @see <a href=
+ * "https://wiki.wireshark.org/Development/LibpcapFileFormat">Source</a>
+ */
+public class PCAP {
+ static final int PCAP_HEADER_LENGTH = 24;
+ private ByteBufferInterface io;
+ private Header hdr;
+ private List<Packet> packets;
+
+ public PCAP(ByteBufferInterface io) {
+ this.io = io;
+ read();
+ }
+
+ public PCAP(Header hdr, List<Packet> packets) {
+ this.hdr = hdr;
+ this.packets = packets;
+ }
+
+ public byte[] readBytesFull() {
Review Comment:
Recommend renaming this to something like `toByteArray()` or `getEncoded()`
since it serializes the header and packets.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Packet.java:
##########
@@ -0,0 +1,149 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+public class Packet {
+ static final int PACKET_HEADER_LENGTH = 16;
+ private ByteBufferInterface io;
+ private long tsSec;
+ private long tsUsec;
+ private long inclLen;
+ private long origLen;
+ private long expectedLength;
+ private long totalLength;
+ private PCAP root;
+ private byte[] rawBody;
+ private String invalidityReason;
+
+ public Packet(ByteBufferInterface io, PCAP root) {
+
+ this.root = root;
+ this.io = io;
+ this.invalidityReason = null;
+ read();
+ }
+
+ public Packet(long tSSec, long tSUsec, long inclLen, long origLen, byte[]
rawBody) {
+
+ this.tsSec = tSSec;
+ this.tsUsec = tSUsec;
+ this.inclLen = inclLen;
+ this.origLen = origLen;
+
+ this.expectedLength = inclLen();
+
+ this.totalLength = PACKET_HEADER_LENGTH + rawBody.length;
+ this.rawBody = rawBody;
+ this.invalidityReason = null;
+ }
+
+ public Packet(byte[] headerArray, PCAP root) {
+ this.root = root;
+ this.io = new ByteBufferInterface(headerArray);
+ this.invalidityReason = null;
+ read();
+ }
+
+ private void read() {
+ this.tsSec = this.io.readU4le();
+ this.tsUsec = this.io.readU4le();
+ this.inclLen = this.io.readU4le();
+ this.origLen = this.io.readU4le();
+
+ this.expectedLength = Math.min(inclLen(), root().hdr().snaplen());
+
+ if (!this.io.isEof() && this.io.bytesLeft() >= expectedLength) {
+ this.rawBody = this.io.readBytes(expectedLength);
+ } else {
+ this.rawBody = new byte[0];
+ }
+ this.totalLength = PACKET_HEADER_LENGTH + this.rawBody.length;
+ }
+
+ public boolean isInvalid() {
+ if (this.rawBody.length == 0) {
+ this.invalidityReason = "Packet body is empty";
+ }
+ if (this.inclLen > this.origLen) {
+ this.invalidityReason = "inclLen > origLen (" + this.inclLen + " >
" + this.origLen + ")";
+ }
+ if (this.origLen == 0) {
+ this.invalidityReason = "origLen == 0";
+ }
+ if (this.inclLen == 0) {
+ this.invalidityReason = "inclLen == 0";
+ }
+ return this.invalidityReason != null;
+ }
+
+ public long tsSec() {
+ return tsSec;
+ }
+
+ public long tsUsec() {
+ return tsUsec;
+ }
+
+ /**
+ * Number of bytes of packet data actually captured and saved in the file.
+ */
+ public long inclLen() {
+ return inclLen;
+ }
+
+ /**
+ * Length of the packet as it appeared on the network when it was captured.
+ */
+ public long origLen() {
+ return origLen;
+ }
+
+ /**
+ * @see <a href=
+ *
"https://wiki.wireshark.org/Development/LibpcapFileFormat#Packet_Data">Source</a>
+ */
+ public PCAP root() {
+ return root;
+ }
+
+ public byte[] rawBody() {
+ return rawBody;
+ }
+
+ public int expectedLength() {
+ return (int) expectedLength;
+ }
+
+ public int totalLength() {
+ return (int) totalLength;
+ }
Review Comment:
Recommend changing the return type to `long` instead of casting, so that the
caller can do a cast only if needed.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Packet.java:
##########
@@ -0,0 +1,149 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+public class Packet {
+ static final int PACKET_HEADER_LENGTH = 16;
+ private ByteBufferInterface io;
+ private long tsSec;
+ private long tsUsec;
+ private long inclLen;
+ private long origLen;
+ private long expectedLength;
+ private long totalLength;
+ private PCAP root;
+ private byte[] rawBody;
+ private String invalidityReason;
+
+ public Packet(ByteBufferInterface io, PCAP root) {
+
+ this.root = root;
+ this.io = io;
+ this.invalidityReason = null;
+ read();
+ }
+
+ public Packet(long tSSec, long tSUsec, long inclLen, long origLen, byte[]
rawBody) {
+
+ this.tsSec = tSSec;
+ this.tsUsec = tSUsec;
+ this.inclLen = inclLen;
+ this.origLen = origLen;
+
+ this.expectedLength = inclLen();
+
+ this.totalLength = PACKET_HEADER_LENGTH + rawBody.length;
+ this.rawBody = rawBody;
+ this.invalidityReason = null;
+ }
+
+ public Packet(byte[] headerArray, PCAP root) {
+ this.root = root;
+ this.io = new ByteBufferInterface(headerArray);
+ this.invalidityReason = null;
+ read();
+ }
+
+ private void read() {
+ this.tsSec = this.io.readU4le();
+ this.tsUsec = this.io.readU4le();
+ this.inclLen = this.io.readU4le();
+ this.origLen = this.io.readU4le();
+
+ this.expectedLength = Math.min(inclLen(), root().hdr().snaplen());
+
+ if (!this.io.isEof() && this.io.bytesLeft() >= expectedLength) {
+ this.rawBody = this.io.readBytes(expectedLength);
+ } else {
+ this.rawBody = new byte[0];
+ }
+ this.totalLength = PACKET_HEADER_LENGTH + this.rawBody.length;
+ }
+
+ public boolean isInvalid() {
+ if (this.rawBody.length == 0) {
+ this.invalidityReason = "Packet body is empty";
+ }
+ if (this.inclLen > this.origLen) {
+ this.invalidityReason = "inclLen > origLen (" + this.inclLen + " >
" + this.origLen + ")";
+ }
+ if (this.origLen == 0) {
+ this.invalidityReason = "origLen == 0";
+ }
+ if (this.inclLen == 0) {
+ this.invalidityReason = "inclLen == 0";
Review Comment:
The variable naming is not very intuitive for an error message, as `inclLen`
is not clear from an end user point of view. Recommend revisiting each of these
messages and spelling them out in more readable terms.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/Packet.java:
##########
@@ -0,0 +1,149 @@
+// MIT License
+
+// Copyright (c) 2015-2024 Kaitai Project
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
all
+// copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE
+// SOFTWARE.
+
+package org.apache.nifi.processors.network.pcap;
+
+public class Packet {
+ static final int PACKET_HEADER_LENGTH = 16;
+ private ByteBufferInterface io;
+ private long tsSec;
+ private long tsUsec;
+ private long inclLen;
+ private long origLen;
+ private long expectedLength;
+ private long totalLength;
+ private PCAP root;
+ private byte[] rawBody;
+ private String invalidityReason;
+
+ public Packet(ByteBufferInterface io, PCAP root) {
+
+ this.root = root;
+ this.io = io;
+ this.invalidityReason = null;
+ read();
+ }
+
+ public Packet(long tSSec, long tSUsec, long inclLen, long origLen, byte[]
rawBody) {
+
+ this.tsSec = tSSec;
+ this.tsUsec = tSUsec;
+ this.inclLen = inclLen;
+ this.origLen = origLen;
+
+ this.expectedLength = inclLen();
+
+ this.totalLength = PACKET_HEADER_LENGTH + rawBody.length;
+ this.rawBody = rawBody;
+ this.invalidityReason = null;
+ }
+
+ public Packet(byte[] headerArray, PCAP root) {
+ this.root = root;
+ this.io = new ByteBufferInterface(headerArray);
+ this.invalidityReason = null;
+ read();
+ }
+
+ private void read() {
+ this.tsSec = this.io.readU4le();
+ this.tsUsec = this.io.readU4le();
+ this.inclLen = this.io.readU4le();
+ this.origLen = this.io.readU4le();
+
+ this.expectedLength = Math.min(inclLen(), root().hdr().snaplen());
+
+ if (!this.io.isEof() && this.io.bytesLeft() >= expectedLength) {
+ this.rawBody = this.io.readBytes(expectedLength);
+ } else {
+ this.rawBody = new byte[0];
+ }
+ this.totalLength = PACKET_HEADER_LENGTH + this.rawBody.length;
+ }
+
+ public boolean isInvalid() {
Review Comment:
Having a status-checking method with a side-effect that sets an invalidity
reason is not great from a design point of view. Instead, I recommend checking
the validity status in the constructor and setting the values. Given the fact
that the body can be changed, that would require recomputing the status in the
`setBody()` method.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = SplitPCAP.ERROR_REASON_LABEL,
+ description = "The reason the flowfile was sent to the failure
relationship."
+ ),
+ @WritesAttribute(
+ attribute = "fragment.identifier",
+ description = "All split PCAP FlowFiles produced from the same parent
PCAP FlowFile will have the same randomly generated UUID added for this
attribute"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of the
split PCAP FlowFiles that were created from a single parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.count",
+ description = "The number of split PCAP FlowFiles generated from the
parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "segment.original.filename ",
+ description = "The filename of the parent PCAP FlowFile"
+ )
+})
+
+public class SplitPCAP extends AbstractProcessor {
+
+ protected static final String ERROR_REASON_LABEL = "ERROR_REASON";
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+ public static final PropertyDescriptor PCAP_MAX_SIZE = new
PropertyDescriptor
+ .Builder().name("PCAP Max Size")
+ .displayName("PCAP Max Size")
+ .description("Maximum size of the output pcap file in bytes.
Defaults to 1MB (1000000)")
+ .required(true)
+ .defaultValue("1MB")
Review Comment:
```suggestion
.defaultValue("1 MB")
```
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = SplitPCAP.ERROR_REASON_LABEL,
+ description = "The reason the flowfile was sent to the failure
relationship."
+ ),
+ @WritesAttribute(
+ attribute = "fragment.identifier",
+ description = "All split PCAP FlowFiles produced from the same parent
PCAP FlowFile will have the same randomly generated UUID added for this
attribute"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of the
split PCAP FlowFiles that were created from a single parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.count",
+ description = "The number of split PCAP FlowFiles generated from the
parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "segment.original.filename ",
+ description = "The filename of the parent PCAP FlowFile"
+ )
+})
+
+public class SplitPCAP extends AbstractProcessor {
+
+ protected static final String ERROR_REASON_LABEL = "ERROR_REASON";
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+ public static final PropertyDescriptor PCAP_MAX_SIZE = new
PropertyDescriptor
+ .Builder().name("PCAP Max Size")
+ .displayName("PCAP Max Size")
+ .description("Maximum size of the output pcap file in bytes.
Defaults to 1MB (1000000)")
Review Comment:
Repeating the default value in the description is not necessary.
```suggestion
.description("Maximum size of each output PCAP file. PCAP
packets larger than the configured size result in routing FlowFiles to the
failure relationship.")
```
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
Review Comment:
```suggestion
@CapabilityDescription("Splits one PCAP file into multiple PCAP files based
on configured properties")
```
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestPCAP.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestPCAP {
+ @Test
+ public void testReadBytesFull() {
+
+ // Create a header for the test PCAP
+ Header hdr = new Header(
+ new byte[]{(byte) 0xa1, (byte) 0xb2, (byte) 0xc3, (byte) 0xd4},
+ 2,
+ 4,
+ 0,
+ (long) 0,
+ (long) 40,
+ (long) 1 // ETHERNET
+ );
+
+ // Create a sample packet
+ List<Packet> packets = new ArrayList<>();
+ packets.add(new Packet(
+ (long) 1713184965,
+ (long) 1000,
+ (long) 30,
+ (long) 30,
+ new byte[]{
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
+ 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
+ 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
+ }));
+
+ // create test PCAP
+
+ PCAP testPcap = new PCAP(hdr, packets);
+
+ // Call the readBytesFull method
+ byte[] result = testPcap.readBytesFull();
+
+ // Assert the expected byte array length
+ assertEquals(70, result.length);
+
+ // Assert the expected byte array values
+ ByteBuffer buffer = ByteBuffer.wrap(result);
+ assertEquals(0xa1b2c3d4, buffer.getInt());
+ ByteBuffer LEBuffer =
ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
+ LEBuffer.position(4);
+ assertEquals(2, LEBuffer.getShort());
+ assertEquals(4, LEBuffer.getShort());
+ assertEquals(0, LEBuffer.getInt());
+ assertEquals(0, LEBuffer.getInt());
+ assertEquals(40, LEBuffer.getInt());
+ assertEquals(1, LEBuffer.getInt());
+ assertEquals(1713184965, LEBuffer.getInt());
+ assertEquals(1000, LEBuffer.getInt());
+ assertEquals(30, LEBuffer.getInt());
+ assertEquals(30, LEBuffer.getInt());
+ byte[] body = new byte[30];
+ LEBuffer.get(40, body, 0, 30).array();
+ assertArrayEquals(new byte[]{
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
+ 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
+ 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
Review Comment:
This byte array should be declared as a static final value and reused in all
places in this test class.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestSplitPCAP.java:
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.processors.network.pcap;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+
+
+class TestSplitPCAP {
+
+ private Header hdr;
+ private Packet validPacket;
+ private Packet invalidPacket;
+
+ @BeforeEach
+ void init() {
+ // Create a header for the test PCAP
+ this.hdr = new Header(
+ new byte[]{(byte) 0xa1, (byte) 0xb2, (byte) 0xc3, (byte) 0xd4},
+ 2,
+ 4,
+ 0,
+ (long) 0,
+ (long) 40,
+ (long) 1 // ETHERNET
Review Comment:
Are these casts to `long` required?
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/test/java/org/apache/nifi/processors/network/pcap/TestPCAP.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.nifi.processors.network.pcap;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestPCAP {
+ @Test
+ public void testReadBytesFull() {
+
+ // Create a header for the test PCAP
+ Header hdr = new Header(
+ new byte[]{(byte) 0xa1, (byte) 0xb2, (byte) 0xc3, (byte) 0xd4},
+ 2,
+ 4,
+ 0,
+ (long) 0,
+ (long) 40,
+ (long) 1 // ETHERNET
+ );
+
+ // Create a sample packet
+ List<Packet> packets = new ArrayList<>();
+ packets.add(new Packet(
+ (long) 1713184965,
+ (long) 1000,
+ (long) 30,
+ (long) 30,
+ new byte[]{
+ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9,
+ 10, 11, 12, 13, 14, 15, 16, 17, 18, 19,
+ 20, 21, 22, 23, 24, 25, 26, 27, 28, 29,
+ }));
+
+ // create test PCAP
+
+ PCAP testPcap = new PCAP(hdr, packets);
+
+ // Call the readBytesFull method
+ byte[] result = testPcap.readBytesFull();
+
+ // Assert the expected byte array length
+ assertEquals(70, result.length);
+
+ // Assert the expected byte array values
+ ByteBuffer buffer = ByteBuffer.wrap(result);
+ assertEquals(0xa1b2c3d4, buffer.getInt());
+ ByteBuffer LEBuffer =
ByteBuffer.wrap(result).order(ByteOrder.LITTLE_ENDIAN);
+ LEBuffer.position(4);
+ assertEquals(2, LEBuffer.getShort());
+ assertEquals(4, LEBuffer.getShort());
+ assertEquals(0, LEBuffer.getInt());
+ assertEquals(0, LEBuffer.getInt());
+ assertEquals(40, LEBuffer.getInt());
+ assertEquals(1, LEBuffer.getInt());
+ assertEquals(1713184965, LEBuffer.getInt());
+ assertEquals(1000, LEBuffer.getInt());
+ assertEquals(30, LEBuffer.getInt());
+ assertEquals(30, LEBuffer.getInt());
Review Comment:
Many of these values should be declared as static final members and reused
both the packet construction and assertions.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = SplitPCAP.ERROR_REASON_LABEL,
+ description = "The reason the flowfile was sent to the failure
relationship."
+ ),
+ @WritesAttribute(
+ attribute = "fragment.identifier",
+ description = "All split PCAP FlowFiles produced from the same parent
PCAP FlowFile will have the same randomly generated UUID added for this
attribute"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of the
split PCAP FlowFiles that were created from a single parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.count",
+ description = "The number of split PCAP FlowFiles generated from the
parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "segment.original.filename ",
+ description = "The filename of the parent PCAP FlowFile"
+ )
+})
+
+public class SplitPCAP extends AbstractProcessor {
+
+ protected static final String ERROR_REASON_LABEL = "ERROR_REASON";
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+ public static final PropertyDescriptor PCAP_MAX_SIZE = new
PropertyDescriptor
+ .Builder().name("PCAP Max Size")
+ .displayName("PCAP Max Size")
+ .description("Maximum size of the output pcap file in bytes.
Defaults to 1MB (1000000)")
+ .required(true)
+ .defaultValue("1MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original FlowFile that was split into segments.
If the FlowFile fails processing, nothing will be sent to "
+ + "this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile cannot be transformed from the
configured input format to the configured output format, "
+ + "the unchanged FlowFile will be routed to this relationship.")
+ .build();
+ public static final Relationship REL_SPLIT = new Relationship.Builder()
+ .name("split")
+ .description("The individual PCAP 'segments' of the original PCAP
FlowFile will be routed to this relationship.")
+ .build();
+
+ private static final List<PropertyDescriptor> DESCRIPTORS =
List.of(PCAP_MAX_SIZE);
+ private static final Set<Relationship> RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ /**
+ * This method is called when a trigger event occurs in the processor.
+ * It processes the incoming flow file, splits it into smaller pcap files
based on the PCAP Max Size,
+ * and transfers the split pcap files to the success relationship.
+ * If the flow file is empty or not parseable, it is transferred to the
failure relationship.
+ *
+ * @param context the process context
+ * @param session the process session
+ */
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final List<FlowFile> splitFiles = new ArrayList<>();
+
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream inStream) throws
IOException {
+ final int pcapMaxSize =
context.getProperty(PCAP_MAX_SIZE.getName()).asDataSize(DataUnit.B).intValue();
+ final List<Packet> loadedPackets = new ArrayList<>();
+ final BufferedInputStream bInStream = new
BufferedInputStream(inStream);
+ int totalPackets = 0;
+
+ if ( bInStream.available() == 0 ) {
Review Comment:
```suggestion
if (bInStream.available() == 0) {
```
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = SplitPCAP.ERROR_REASON_LABEL,
+ description = "The reason the flowfile was sent to the failure
relationship."
+ ),
+ @WritesAttribute(
+ attribute = "fragment.identifier",
+ description = "All split PCAP FlowFiles produced from the same parent
PCAP FlowFile will have the same randomly generated UUID added for this
attribute"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of the
split PCAP FlowFiles that were created from a single parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.count",
+ description = "The number of split PCAP FlowFiles generated from the
parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "segment.original.filename ",
+ description = "The filename of the parent PCAP FlowFile"
+ )
+})
+
+public class SplitPCAP extends AbstractProcessor {
+
+ protected static final String ERROR_REASON_LABEL = "ERROR_REASON";
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+ public static final PropertyDescriptor PCAP_MAX_SIZE = new
PropertyDescriptor
+ .Builder().name("PCAP Max Size")
+ .displayName("PCAP Max Size")
+ .description("Maximum size of the output pcap file in bytes.
Defaults to 1MB (1000000)")
+ .required(true)
+ .defaultValue("1MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original FlowFile that was split into segments.
If the FlowFile fails processing, nothing will be sent to "
+ + "this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile cannot be transformed from the
configured input format to the configured output format, "
+ + "the unchanged FlowFile will be routed to this relationship.")
+ .build();
+ public static final Relationship REL_SPLIT = new Relationship.Builder()
+ .name("split")
+ .description("The individual PCAP 'segments' of the original PCAP
FlowFile will be routed to this relationship.")
+ .build();
+
+ private static final List<PropertyDescriptor> DESCRIPTORS =
List.of(PCAP_MAX_SIZE);
+ private static final Set<Relationship> RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ /**
+ * This method is called when a trigger event occurs in the processor.
+ * It processes the incoming flow file, splits it into smaller pcap files
based on the PCAP Max Size,
+ * and transfers the split pcap files to the success relationship.
+ * If the flow file is empty or not parseable, it is transferred to the
failure relationship.
+ *
+ * @param context the process context
+ * @param session the process session
+ */
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final List<FlowFile> splitFiles = new ArrayList<>();
+
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream inStream) throws
IOException {
+ final int pcapMaxSize =
context.getProperty(PCAP_MAX_SIZE.getName()).asDataSize(DataUnit.B).intValue();
+ final List<Packet> loadedPackets = new ArrayList<>();
+ final BufferedInputStream bInStream = new
BufferedInputStream(inStream);
+ int totalPackets = 0;
+
+ if ( bInStream.available() == 0 ) {
+ throw new ProcessException("PCAP file empty.");
+ }
+
+ final byte[] pcapHeader = new
byte[PCAP.PCAP_HEADER_LENGTH];
+ bInStream.read(pcapHeader, 0, PCAP.PCAP_HEADER_LENGTH);
+ int currentPcapTotalLength = PCAP.PCAP_HEADER_LENGTH;
+
+ final PCAP templatePcap = new PCAP(new
ByteBufferInterface(pcapHeader));
+
+ while (bInStream.available() > 0) {
+
+ byte[] packetHeader = new
byte[Packet.PACKET_HEADER_LENGTH];
+ bInStream.read(packetHeader, 0,
Packet.PACKET_HEADER_LENGTH);
+ Packet currentPacket = new Packet(packetHeader,
templatePcap);
+
+ if (currentPacket.totalLength() > pcapMaxSize) {
+ throw new ProcessException("PCAP contains a packet
larger than the max size.");
+ }
+
+ byte[] packetbody = new byte[(int)
currentPacket.expectedLength()];
+
+ bInStream.read(packetbody, 0, (int)
currentPacket.expectedLength());
+ currentPacket.setBody(packetbody);
+
+ if (currentPacket.isInvalid()) {
+ throw new ProcessException("PCAP contains an
invalid packet. Packet number " + totalPackets + 1 + " is invalid - " +
currentPacket.invalidityReason());
+ }
+
+ if (currentPcapTotalLength +
currentPacket.totalLength() > pcapMaxSize) {
+
+ templatePcap.packets().addAll(loadedPackets);
+ FlowFile newFlowFile = session.create(flowFile);
+ try (final OutputStream out =
session.write(newFlowFile)) {
+ out.write(templatePcap.readBytesFull());
+ splitFiles.add(newFlowFile);
+ }
+
+ loadedPackets.clear();
+ currentPcapTotalLength = PCAP.PCAP_HEADER_LENGTH;
+ templatePcap.packets().clear();
+ }
+
+ loadedPackets.add(currentPacket);
+ totalPackets++;
+ currentPcapTotalLength += currentPacket.totalLength();
+ }
+
+ // If there are any packets left over, create a new
flowfile.
+ if (!loadedPackets.isEmpty()) {
+ templatePcap.packets().addAll(loadedPackets);
+ FlowFile newFlowFile = session.create(flowFile);
+ try (final OutputStream out =
session.write(newFlowFile)) {
+ out.write(templatePcap.readBytesFull());
+ splitFiles.add(newFlowFile);
+ }
+ }
+ }
+ });
+ } catch (ProcessException e) {
+ getLogger().error("Failed to split {}", flowFile, e);
+ session.remove(splitFiles);
+ splitFiles.clear();
+ session.putAttribute(flowFile, ERROR_REASON_LABEL, e.getMessage());
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final String fragmentId = UUID.randomUUID().toString();
+ final String originalFileName =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final String originalFileNameWithoutExtension =
originalFileName.substring(0, originalFileName.lastIndexOf("."));
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(FRAGMENT_COUNT, String.valueOf(splitFiles.size()));
+ attributes.put(FRAGMENT_ID, fragmentId);
+ attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
+
+ IntStream.range(0, splitFiles.size()).forEach(index -> {
+ FlowFile split = splitFiles.get(index);
+ attributes.put(CoreAttributes.FILENAME.key(),
originalFileNameWithoutExtension + "-" + index + ".pcap");
+ attributes.put(FRAGMENT_INDEX, Integer.toString(index));
+ session.putAllAttributes(split, attributes);
Review Comment:
This call returns an updated reference, which should be used instead of the
previous references. Is there a reason for not setting these attributes in the
main packet processing loop?
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = SplitPCAP.ERROR_REASON_LABEL,
+ description = "The reason the flowfile was sent to the failure
relationship."
+ ),
+ @WritesAttribute(
+ attribute = "fragment.identifier",
+ description = "All split PCAP FlowFiles produced from the same parent
PCAP FlowFile will have the same randomly generated UUID added for this
attribute"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of the
split PCAP FlowFiles that were created from a single parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.count",
+ description = "The number of split PCAP FlowFiles generated from the
parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "segment.original.filename ",
+ description = "The filename of the parent PCAP FlowFile"
+ )
+})
+
+public class SplitPCAP extends AbstractProcessor {
+
+ protected static final String ERROR_REASON_LABEL = "ERROR_REASON";
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+ public static final PropertyDescriptor PCAP_MAX_SIZE = new
PropertyDescriptor
+ .Builder().name("PCAP Max Size")
+ .displayName("PCAP Max Size")
+ .description("Maximum size of the output pcap file in bytes.
Defaults to 1MB (1000000)")
+ .required(true)
+ .defaultValue("1MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original FlowFile that was split into segments.
If the FlowFile fails processing, nothing will be sent to "
+ + "this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile cannot be transformed from the
configured input format to the configured output format, "
+ + "the unchanged FlowFile will be routed to this relationship.")
+ .build();
+ public static final Relationship REL_SPLIT = new Relationship.Builder()
+ .name("split")
+ .description("The individual PCAP 'segments' of the original PCAP
FlowFile will be routed to this relationship.")
+ .build();
+
+ private static final List<PropertyDescriptor> DESCRIPTORS =
List.of(PCAP_MAX_SIZE);
+ private static final Set<Relationship> RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ /**
+ * This method is called when a trigger event occurs in the processor.
+ * It processes the incoming flow file, splits it into smaller pcap files
based on the PCAP Max Size,
+ * and transfers the split pcap files to the success relationship.
+ * If the flow file is empty or not parseable, it is transferred to the
failure relationship.
+ *
+ * @param context the process context
+ * @param session the process session
+ */
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final List<FlowFile> splitFiles = new ArrayList<>();
+
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream inStream) throws
IOException {
+ final int pcapMaxSize =
context.getProperty(PCAP_MAX_SIZE.getName()).asDataSize(DataUnit.B).intValue();
+ final List<Packet> loadedPackets = new ArrayList<>();
+ final BufferedInputStream bInStream = new
BufferedInputStream(inStream);
Review Comment:
```suggestion
final BufferedInputStream bufferedStream = new
BufferedInputStream(inStream);
```
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = SplitPCAP.ERROR_REASON_LABEL,
+ description = "The reason the flowfile was sent to the failure
relationship."
+ ),
+ @WritesAttribute(
+ attribute = "fragment.identifier",
+ description = "All split PCAP FlowFiles produced from the same parent
PCAP FlowFile will have the same randomly generated UUID added for this
attribute"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of the
split PCAP FlowFiles that were created from a single parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.count",
+ description = "The number of split PCAP FlowFiles generated from the
parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "segment.original.filename ",
+ description = "The filename of the parent PCAP FlowFile"
+ )
+})
+
+public class SplitPCAP extends AbstractProcessor {
+
+ protected static final String ERROR_REASON_LABEL = "ERROR_REASON";
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+ public static final PropertyDescriptor PCAP_MAX_SIZE = new
PropertyDescriptor
+ .Builder().name("PCAP Max Size")
+ .displayName("PCAP Max Size")
+ .description("Maximum size of the output pcap file in bytes.
Defaults to 1MB (1000000)")
+ .required(true)
+ .defaultValue("1MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original FlowFile that was split into segments.
If the FlowFile fails processing, nothing will be sent to "
+ + "this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile cannot be transformed from the
configured input format to the configured output format, "
+ + "the unchanged FlowFile will be routed to this relationship.")
+ .build();
+ public static final Relationship REL_SPLIT = new Relationship.Builder()
+ .name("split")
+ .description("The individual PCAP 'segments' of the original PCAP
FlowFile will be routed to this relationship.")
+ .build();
+
+ private static final List<PropertyDescriptor> DESCRIPTORS =
List.of(PCAP_MAX_SIZE);
+ private static final Set<Relationship> RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ /**
+ * This method is called when a trigger event occurs in the processor.
+ * It processes the incoming flow file, splits it into smaller pcap files
based on the PCAP Max Size,
+ * and transfers the split pcap files to the success relationship.
+ * If the flow file is empty or not parseable, it is transferred to the
failure relationship.
+ *
+ * @param context the process context
+ * @param session the process session
+ */
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final List<FlowFile> splitFiles = new ArrayList<>();
+
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream inStream) throws
IOException {
+ final int pcapMaxSize =
context.getProperty(PCAP_MAX_SIZE.getName()).asDataSize(DataUnit.B).intValue();
+ final List<Packet> loadedPackets = new ArrayList<>();
+ final BufferedInputStream bInStream = new
BufferedInputStream(inStream);
+ int totalPackets = 0;
+
+ if ( bInStream.available() == 0 ) {
+ throw new ProcessException("PCAP file empty.");
+ }
+
+ final byte[] pcapHeader = new
byte[PCAP.PCAP_HEADER_LENGTH];
+ bInStream.read(pcapHeader, 0, PCAP.PCAP_HEADER_LENGTH);
+ int currentPcapTotalLength = PCAP.PCAP_HEADER_LENGTH;
+
+ final PCAP templatePcap = new PCAP(new
ByteBufferInterface(pcapHeader));
+
+ while (bInStream.available() > 0) {
+
+ byte[] packetHeader = new
byte[Packet.PACKET_HEADER_LENGTH];
+ bInStream.read(packetHeader, 0,
Packet.PACKET_HEADER_LENGTH);
+ Packet currentPacket = new Packet(packetHeader,
templatePcap);
+
+ if (currentPacket.totalLength() > pcapMaxSize) {
Review Comment:
The property description does not make it clear that the maximum size
violation will cause an error in processing. That is worth noting.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = SplitPCAP.ERROR_REASON_LABEL,
+ description = "The reason the flowfile was sent to the failure
relationship."
+ ),
+ @WritesAttribute(
+ attribute = "fragment.identifier",
+ description = "All split PCAP FlowFiles produced from the same parent
PCAP FlowFile will have the same randomly generated UUID added for this
attribute"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of the
split PCAP FlowFiles that were created from a single parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.count",
+ description = "The number of split PCAP FlowFiles generated from the
parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "segment.original.filename ",
+ description = "The filename of the parent PCAP FlowFile"
+ )
+})
+
+public class SplitPCAP extends AbstractProcessor {
+
+ protected static final String ERROR_REASON_LABEL = "ERROR_REASON";
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+ public static final PropertyDescriptor PCAP_MAX_SIZE = new
PropertyDescriptor
+ .Builder().name("PCAP Max Size")
+ .displayName("PCAP Max Size")
+ .description("Maximum size of the output pcap file in bytes.
Defaults to 1MB (1000000)")
+ .required(true)
+ .defaultValue("1MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original FlowFile that was split into segments.
If the FlowFile fails processing, nothing will be sent to "
+ + "this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile cannot be transformed from the
configured input format to the configured output format, "
+ + "the unchanged FlowFile will be routed to this relationship.")
+ .build();
+ public static final Relationship REL_SPLIT = new Relationship.Builder()
+ .name("split")
+ .description("The individual PCAP 'segments' of the original PCAP
FlowFile will be routed to this relationship.")
+ .build();
+
+ private static final List<PropertyDescriptor> DESCRIPTORS =
List.of(PCAP_MAX_SIZE);
+ private static final Set<Relationship> RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ /**
+ * This method is called when a trigger event occurs in the processor.
+ * It processes the incoming flow file, splits it into smaller pcap files
based on the PCAP Max Size,
+ * and transfers the split pcap files to the success relationship.
+ * If the flow file is empty or not parseable, it is transferred to the
failure relationship.
+ *
+ * @param context the process context
+ * @param session the process session
+ */
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final List<FlowFile> splitFiles = new ArrayList<>();
+
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream inStream) throws
IOException {
Review Comment:
This callback is long a deeply nested. It would be better to break it up
into smaller methods, and probably pull it out to a separate inner-class to
avoid the level of nesting.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = SplitPCAP.ERROR_REASON_LABEL,
+ description = "The reason the flowfile was sent to the failure
relationship."
+ ),
+ @WritesAttribute(
+ attribute = "fragment.identifier",
+ description = "All split PCAP FlowFiles produced from the same parent
PCAP FlowFile will have the same randomly generated UUID added for this
attribute"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of the
split PCAP FlowFiles that were created from a single parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.count",
+ description = "The number of split PCAP FlowFiles generated from the
parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "segment.original.filename ",
+ description = "The filename of the parent PCAP FlowFile"
+ )
+})
+
+public class SplitPCAP extends AbstractProcessor {
+
+ protected static final String ERROR_REASON_LABEL = "ERROR_REASON";
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+ public static final PropertyDescriptor PCAP_MAX_SIZE = new
PropertyDescriptor
+ .Builder().name("PCAP Max Size")
+ .displayName("PCAP Max Size")
+ .description("Maximum size of the output pcap file in bytes.
Defaults to 1MB (1000000)")
+ .required(true)
+ .defaultValue("1MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original FlowFile that was split into segments.
If the FlowFile fails processing, nothing will be sent to "
+ + "this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile cannot be transformed from the
configured input format to the configured output format, "
+ + "the unchanged FlowFile will be routed to this relationship.")
+ .build();
+ public static final Relationship REL_SPLIT = new Relationship.Builder()
+ .name("split")
+ .description("The individual PCAP 'segments' of the original PCAP
FlowFile will be routed to this relationship.")
+ .build();
+
+ private static final List<PropertyDescriptor> DESCRIPTORS =
List.of(PCAP_MAX_SIZE);
+ private static final Set<Relationship> RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ /**
+ * This method is called when a trigger event occurs in the processor.
+ * It processes the incoming flow file, splits it into smaller pcap files
based on the PCAP Max Size,
+ * and transfers the split pcap files to the success relationship.
+ * If the flow file is empty or not parseable, it is transferred to the
failure relationship.
+ *
+ * @param context the process context
+ * @param session the process session
+ */
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final List<FlowFile> splitFiles = new ArrayList<>();
+
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream inStream) throws
IOException {
+ final int pcapMaxSize =
context.getProperty(PCAP_MAX_SIZE.getName()).asDataSize(DataUnit.B).intValue();
+ final List<Packet> loadedPackets = new ArrayList<>();
+ final BufferedInputStream bInStream = new
BufferedInputStream(inStream);
+ int totalPackets = 0;
+
+ if ( bInStream.available() == 0 ) {
+ throw new ProcessException("PCAP file empty.");
+ }
+
+ final byte[] pcapHeader = new
byte[PCAP.PCAP_HEADER_LENGTH];
+ bInStream.read(pcapHeader, 0, PCAP.PCAP_HEADER_LENGTH);
+ int currentPcapTotalLength = PCAP.PCAP_HEADER_LENGTH;
+
+ final PCAP templatePcap = new PCAP(new
ByteBufferInterface(pcapHeader));
+
+ while (bInStream.available() > 0) {
+
+ byte[] packetHeader = new
byte[Packet.PACKET_HEADER_LENGTH];
+ bInStream.read(packetHeader, 0,
Packet.PACKET_HEADER_LENGTH);
+ Packet currentPacket = new Packet(packetHeader,
templatePcap);
+
+ if (currentPacket.totalLength() > pcapMaxSize) {
+ throw new ProcessException("PCAP contains a packet
larger than the max size.");
+ }
+
+ byte[] packetbody = new byte[(int)
currentPacket.expectedLength()];
+
+ bInStream.read(packetbody, 0, (int)
currentPacket.expectedLength());
+ currentPacket.setBody(packetbody);
+
+ if (currentPacket.isInvalid()) {
+ throw new ProcessException("PCAP contains an
invalid packet. Packet number " + totalPackets + 1 + " is invalid - " +
currentPacket.invalidityReason());
+ }
+
+ if (currentPcapTotalLength +
currentPacket.totalLength() > pcapMaxSize) {
+
+ templatePcap.packets().addAll(loadedPackets);
+ FlowFile newFlowFile = session.create(flowFile);
+ try (final OutputStream out =
session.write(newFlowFile)) {
+ out.write(templatePcap.readBytesFull());
+ splitFiles.add(newFlowFile);
+ }
+
+ loadedPackets.clear();
+ currentPcapTotalLength = PCAP.PCAP_HEADER_LENGTH;
+ templatePcap.packets().clear();
+ }
+
+ loadedPackets.add(currentPacket);
+ totalPackets++;
+ currentPcapTotalLength += currentPacket.totalLength();
+ }
+
+ // If there are any packets left over, create a new
flowfile.
+ if (!loadedPackets.isEmpty()) {
+ templatePcap.packets().addAll(loadedPackets);
+ FlowFile newFlowFile = session.create(flowFile);
+ try (final OutputStream out =
session.write(newFlowFile)) {
+ out.write(templatePcap.readBytesFull());
+ splitFiles.add(newFlowFile);
+ }
+ }
+ }
+ });
+ } catch (ProcessException e) {
+ getLogger().error("Failed to split {}", flowFile, e);
+ session.remove(splitFiles);
+ splitFiles.clear();
+ session.putAttribute(flowFile, ERROR_REASON_LABEL, e.getMessage());
+ session.transfer(flowFile, REL_FAILURE);
+ return;
+ }
+
+ final String fragmentId = UUID.randomUUID().toString();
+ final String originalFileName =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
+ final String originalFileNameWithoutExtension =
originalFileName.substring(0, originalFileName.lastIndexOf("."));
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put(FRAGMENT_COUNT, String.valueOf(splitFiles.size()));
+ attributes.put(FRAGMENT_ID, fragmentId);
+ attributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
+
+ IntStream.range(0, splitFiles.size()).forEach(index -> {
+ FlowFile split = splitFiles.get(index);
+ attributes.put(CoreAttributes.FILENAME.key(),
originalFileNameWithoutExtension + "-" + index + ".pcap");
Review Comment:
Recommend using String.format() instead of concatenation for readability of
the filename.
```suggestion
attributes.put(CoreAttributes.FILENAME.key(),
"%s-%d.pcap".formatted(originalFileNameWithoutExtension, index));
```
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = SplitPCAP.ERROR_REASON_LABEL,
+ description = "The reason the flowfile was sent to the failure
relationship."
+ ),
+ @WritesAttribute(
+ attribute = "fragment.identifier",
+ description = "All split PCAP FlowFiles produced from the same parent
PCAP FlowFile will have the same randomly generated UUID added for this
attribute"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of the
split PCAP FlowFiles that were created from a single parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.count",
+ description = "The number of split PCAP FlowFiles generated from the
parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "segment.original.filename ",
+ description = "The filename of the parent PCAP FlowFile"
+ )
+})
+
+public class SplitPCAP extends AbstractProcessor {
+
+ protected static final String ERROR_REASON_LABEL = "ERROR_REASON";
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+ public static final PropertyDescriptor PCAP_MAX_SIZE = new
PropertyDescriptor
+ .Builder().name("PCAP Max Size")
+ .displayName("PCAP Max Size")
+ .description("Maximum size of the output pcap file in bytes.
Defaults to 1MB (1000000)")
+ .required(true)
+ .defaultValue("1MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original FlowFile that was split into segments.
If the FlowFile fails processing, nothing will be sent to "
+ + "this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile cannot be transformed from the
configured input format to the configured output format, "
+ + "the unchanged FlowFile will be routed to this relationship.")
+ .build();
+ public static final Relationship REL_SPLIT = new Relationship.Builder()
+ .name("split")
+ .description("The individual PCAP 'segments' of the original PCAP
FlowFile will be routed to this relationship.")
+ .build();
+
+ private static final List<PropertyDescriptor> DESCRIPTORS =
List.of(PCAP_MAX_SIZE);
+ private static final Set<Relationship> RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ /**
+ * This method is called when a trigger event occurs in the processor.
+ * It processes the incoming flow file, splits it into smaller pcap files
based on the PCAP Max Size,
+ * and transfers the split pcap files to the success relationship.
+ * If the flow file is empty or not parseable, it is transferred to the
failure relationship.
+ *
+ * @param context the process context
+ * @param session the process session
+ */
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final List<FlowFile> splitFiles = new ArrayList<>();
+
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream inStream) throws
IOException {
+ final int pcapMaxSize =
context.getProperty(PCAP_MAX_SIZE.getName()).asDataSize(DataUnit.B).intValue();
+ final List<Packet> loadedPackets = new ArrayList<>();
+ final BufferedInputStream bInStream = new
BufferedInputStream(inStream);
+ int totalPackets = 0;
+
+ if ( bInStream.available() == 0 ) {
+ throw new ProcessException("PCAP file empty.");
+ }
+
+ final byte[] pcapHeader = new
byte[PCAP.PCAP_HEADER_LENGTH];
+ bInStream.read(pcapHeader, 0, PCAP.PCAP_HEADER_LENGTH);
+ int currentPcapTotalLength = PCAP.PCAP_HEADER_LENGTH;
+
+ final PCAP templatePcap = new PCAP(new
ByteBufferInterface(pcapHeader));
+
+ while (bInStream.available() > 0) {
+
+ byte[] packetHeader = new
byte[Packet.PACKET_HEADER_LENGTH];
+ bInStream.read(packetHeader, 0,
Packet.PACKET_HEADER_LENGTH);
+ Packet currentPacket = new Packet(packetHeader,
templatePcap);
+
+ if (currentPacket.totalLength() > pcapMaxSize) {
+ throw new ProcessException("PCAP contains a packet
larger than the max size.");
+ }
+
+ byte[] packetbody = new byte[(int)
currentPacket.expectedLength()];
+
+ bInStream.read(packetbody, 0, (int)
currentPacket.expectedLength());
+ currentPacket.setBody(packetbody);
+
+ if (currentPacket.isInvalid()) {
+ throw new ProcessException("PCAP contains an
invalid packet. Packet number " + totalPackets + 1 + " is invalid - " +
currentPacket.invalidityReason());
Review Comment:
Recommend using `.formatted()` to construct the message with placeholders.
##########
nifi-extension-bundles/nifi-network-bundle/nifi-network-processors/src/main/java/org/apache/nifi/processors/network/pcap/SplitPCAP.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.network.pcap;
+
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.io.InputStreamCallback;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.IntStream;
+
+@SideEffectFree
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"PCAP", "Splitter", "Network", "Packet", "Capture", "Wireshark",
"TShark", "TcpDump", "WinDump", "sniffers"})
+@CapabilityDescription("Splits a pcap file into multiple pcap files based on a
maximum size.")
+@WritesAttributes({
+ @WritesAttribute(
+ attribute = SplitPCAP.ERROR_REASON_LABEL,
+ description = "The reason the flowfile was sent to the failure
relationship."
+ ),
+ @WritesAttribute(
+ attribute = "fragment.identifier",
+ description = "All split PCAP FlowFiles produced from the same parent
PCAP FlowFile will have the same randomly generated UUID added for this
attribute"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.index",
+ description = "A one-up number that indicates the ordering of the
split PCAP FlowFiles that were created from a single parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "fragment.count",
+ description = "The number of split PCAP FlowFiles generated from the
parent PCAP FlowFile"
+ ),
+ @WritesAttribute(
+ attribute = "segment.original.filename ",
+ description = "The filename of the parent PCAP FlowFile"
+ )
+})
+
+public class SplitPCAP extends AbstractProcessor {
+
+ protected static final String ERROR_REASON_LABEL = "ERROR_REASON";
+ public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
+ public static final String FRAGMENT_INDEX =
FragmentAttributes.FRAGMENT_INDEX.key();
+ public static final String FRAGMENT_COUNT =
FragmentAttributes.FRAGMENT_COUNT.key();
+ public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
+ public static final PropertyDescriptor PCAP_MAX_SIZE = new
PropertyDescriptor
+ .Builder().name("PCAP Max Size")
+ .displayName("PCAP Max Size")
+ .description("Maximum size of the output pcap file in bytes.
Defaults to 1MB (1000000)")
+ .required(true)
+ .defaultValue("1MB")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final Relationship REL_ORIGINAL = new Relationship.Builder()
+ .name("original")
+ .description("The original FlowFile that was split into segments.
If the FlowFile fails processing, nothing will be sent to "
+ + "this relationship")
+ .build();
+ public static final Relationship REL_FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("If a FlowFile cannot be transformed from the
configured input format to the configured output format, "
+ + "the unchanged FlowFile will be routed to this relationship.")
+ .build();
+ public static final Relationship REL_SPLIT = new Relationship.Builder()
+ .name("split")
+ .description("The individual PCAP 'segments' of the original PCAP
FlowFile will be routed to this relationship.")
+ .build();
+
+ private static final List<PropertyDescriptor> DESCRIPTORS =
List.of(PCAP_MAX_SIZE);
+ private static final Set<Relationship> RELATIONSHIPS =
Set.of(REL_ORIGINAL, REL_FAILURE, REL_SPLIT);
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return DESCRIPTORS;
+ }
+
+ /**
+ * This method is called when a trigger event occurs in the processor.
+ * It processes the incoming flow file, splits it into smaller pcap files
based on the PCAP Max Size,
+ * and transfers the split pcap files to the success relationship.
+ * If the flow file is empty or not parseable, it is transferred to the
failure relationship.
+ *
+ * @param context the process context
+ * @param session the process session
+ */
+ @Override
+ public void onTrigger(final ProcessContext context, final ProcessSession
session) {
+
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final List<FlowFile> splitFiles = new ArrayList<>();
+
+ try {
+ session.read(flowFile, new InputStreamCallback() {
+ @Override
+ public void process(final InputStream inStream) throws
IOException {
+ final int pcapMaxSize =
context.getProperty(PCAP_MAX_SIZE.getName()).asDataSize(DataUnit.B).intValue();
+ final List<Packet> loadedPackets = new ArrayList<>();
+ final BufferedInputStream bInStream = new
BufferedInputStream(inStream);
+ int totalPackets = 0;
+
+ if ( bInStream.available() == 0 ) {
+ throw new ProcessException("PCAP file empty.");
+ }
+
+ final byte[] pcapHeader = new
byte[PCAP.PCAP_HEADER_LENGTH];
+ bInStream.read(pcapHeader, 0, PCAP.PCAP_HEADER_LENGTH);
+ int currentPcapTotalLength = PCAP.PCAP_HEADER_LENGTH;
+
+ final PCAP templatePcap = new PCAP(new
ByteBufferInterface(pcapHeader));
+
+ while (bInStream.available() > 0) {
+
+ byte[] packetHeader = new
byte[Packet.PACKET_HEADER_LENGTH];
+ bInStream.read(packetHeader, 0,
Packet.PACKET_HEADER_LENGTH);
+ Packet currentPacket = new Packet(packetHeader,
templatePcap);
+
+ if (currentPacket.totalLength() > pcapMaxSize) {
+ throw new ProcessException("PCAP contains a packet
larger than the max size.");
Review Comment:
Recommend including the packet length in the message:
```suggestion
throw new ProcessException("PCAP Packet length
[%d] larger then configured maximum
[%d]".formatted(currentPacket.totalLength(), pcapMaxSize));
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]