This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 61b5c1a7f5 NIFI-11437 Switched to StreamUtils.fillBuffer() for buffer,
Improved EncryptContentPGP Content Type Detection
61b5c1a7f5 is described below
commit 61b5c1a7f517abc25b0297d6b13952560777bb59
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Apr 12 13:00:40 2023 -0500
NIFI-11437 Switched to StreamUtils.fillBuffer() for buffer, Improved
EncryptContentPGP Content Type Detection
This closes #7166
Signed-off-by: Paul Grey <[email protected]>
(cherry picked from commit bc5f00a6671c0f3fd9d6c8599d196f414ac47fd9)
---
.../nifi/processors/pgp/EncryptContentPGP.java | 85 +++++++++++-----------
.../processors/pgp/io/EncodingStreamCallback.java | 2 +-
2 files changed, 44 insertions(+), 43 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java
b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java
index a45d90d1f0..9b676fd830 100644
---
a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java
+++
b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java
@@ -29,12 +29,12 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.pgp.service.api.PGPPublicKeyService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.io.StreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.pgp.attributes.CompressionAlgorithm;
@@ -57,9 +57,11 @@ import
org.bouncycastle.openpgp.operator.bc.BcPGPDataEncryptorBuilder;
import
org.bouncycastle.openpgp.operator.bc.BcPublicKeyKeyEncryptionMethodGenerator;
import
org.bouncycastle.openpgp.operator.jcajce.JcePBEKeyEncryptionMethodGenerator;
+import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.PushbackInputStream;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.Arrays;
@@ -198,14 +200,10 @@ public class EncryptContentPGP extends AbstractProcessor {
}
try {
- final PacketReadInputStreamCallback packetCallback = new
PacketReadInputStreamCallback();
- session.read(flowFile, packetCallback);
-
final SymmetricKeyAlgorithm symmetricKeyAlgorithm =
getSymmetricKeyAlgorithm(context);
final FileEncoding fileEncoding = getFileEncoding(context);
final CompressionAlgorithm compressionAlgorithm =
getCompressionAlgorithm(context);
- final StreamCallback callback = getEncryptStreamCallback(context,
flowFile, symmetricKeyAlgorithm,
- compressionAlgorithm, fileEncoding,
packetCallback.packetFound);
+ final StreamCallback callback = getEncryptStreamCallback(context,
flowFile, symmetricKeyAlgorithm, compressionAlgorithm, fileEncoding);
flowFile = session.write(flowFile, callback);
final Map<String, String> attributes =
getAttributes(symmetricKeyAlgorithm, fileEncoding, compressionAlgorithm);
@@ -267,8 +265,7 @@ public class EncryptContentPGP extends AbstractProcessor {
final FlowFile flowFile,
final
SymmetricKeyAlgorithm symmetricKeyAlgorithm,
final CompressionAlgorithm
compressionAlgorithm,
- final FileEncoding
fileEncoding,
- final boolean packetFound)
{
+ final FileEncoding
fileEncoding) {
final SecureRandom secureRandom = new SecureRandom();
final PGPDataEncryptorBuilder dataEncryptorBuilder = new
BcPGPDataEncryptorBuilder(symmetricKeyAlgorithm.getId())
.setSecureRandom(secureRandom)
@@ -278,7 +275,7 @@ public class EncryptContentPGP extends AbstractProcessor {
methodGenerators.forEach(encryptedDataGenerator::addMethod);
final String filename =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
- return new EncryptStreamCallback(fileEncoding, compressionAlgorithm,
filename, packetFound, encryptedDataGenerator);
+ return new EncryptStreamCallback(fileEncoding, compressionAlgorithm,
filename, getLogger(), encryptedDataGenerator);
}
private List<PGPKeyEncryptionMethodGenerator>
getEncryptionMethodGenerators(final ProcessContext context,
@@ -338,43 +335,18 @@ public class EncryptContentPGP extends AbstractProcessor {
return attributes;
}
- private class PacketReadInputStreamCallback implements InputStreamCallback
{
- private boolean packetFound;
-
- /**
- * Process Input Stream and attempt to read OpenPGP Packet for content
detection
- *
- * @param inputStream Input Stream to be read
- */
- @Override
- public void process(final InputStream inputStream) {
- try {
- final InputStream decodedInputStream =
PGPUtil.getDecoderStream(inputStream);
- final BCPGInputStream packetInputStream = new
BCPGInputStream(decodedInputStream);
- final Packet packet = packetInputStream.readPacket();
- if (packet == null) {
- getLogger().debug("PGP Packet not found");
- } else {
- packetFound = true;
- }
- } catch (final IOException e) {
- getLogger().debug("PGP Packet read failed", e);
- }
- }
- }
-
private static class EncryptStreamCallback extends EncodingStreamCallback {
- private final boolean packetFound;
-
private final PGPEncryptedDataGenerator encryptedDataGenerator;
+ private final ComponentLog logger;
+
public EncryptStreamCallback(final FileEncoding fileEncoding,
final CompressionAlgorithm
compressionAlgorithm,
final String filename,
- final boolean packetFound,
+ final ComponentLog logger,
final PGPEncryptedDataGenerator
encryptedDataGenerator) {
super(fileEncoding, compressionAlgorithm, filename);
- this.packetFound = packetFound;
+ this.logger = logger;
this.encryptedDataGenerator = encryptedDataGenerator;
}
@@ -388,15 +360,44 @@ public class EncryptContentPGP extends AbstractProcessor {
*/
@Override
protected void processEncoding(final InputStream inputStream, final
OutputStream encodingOutputStream) throws IOException, PGPException {
- try (final OutputStream encryptedOutputStream =
encryptedDataGenerator.open(encodingOutputStream, createOutputBuffer())) {
- if (packetFound) {
+ try (
+ final PushbackInputStream pushbackInputStream = new
PushbackInputStream(inputStream, OUTPUT_BUFFER_SIZE);
+ final OutputStream encryptedOutputStream =
encryptedDataGenerator.open(encodingOutputStream, createOutputBuffer())
+ ) {
+ if (isPacketFound(pushbackInputStream)) {
// Write OpenPGP packets to encrypted stream without
additional encoding
- StreamUtils.copy(inputStream, encryptedOutputStream);
+ StreamUtils.copy(pushbackInputStream,
encryptedOutputStream);
} else {
- super.processEncoding(inputStream, encryptedOutputStream);
+ super.processEncoding(pushbackInputStream,
encryptedOutputStream);
}
}
encryptedDataGenerator.close();
}
+
+ private boolean isPacketFound(final PushbackInputStream
pushbackInputStream) throws IOException {
+ boolean packetFound = false;
+
+ final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];
+ final int bytesRead = StreamUtils.fillBuffer(pushbackInputStream,
buffer, false);
+ logger.debug("PGP Packet search read buffer bytes [{}]",
bytesRead);
+ try (
+ final ByteArrayInputStream byteArrayInputStream = new
ByteArrayInputStream(buffer);
+ final InputStream decodedInputStream =
PGPUtil.getDecoderStream(byteArrayInputStream);
+ final BCPGInputStream packetInputStream = new
BCPGInputStream(decodedInputStream)
+ ) {
+ final Packet packet = packetInputStream.readPacket();
+ if (packet == null) {
+ logger.debug("PGP Packet not found");
+ } else {
+ packetFound = true;
+ }
+ } catch (final Exception e) {
+ logger.debug("PGP Packet read failed", e);
+ } finally {
+ pushbackInputStream.unread(buffer, 0, bytesRead);
+ }
+
+ return packetFound;
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java
b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java
index 1377e4e289..68ef8e88cc 100644
---
a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java
+++
b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java
@@ -37,7 +37,7 @@ import java.util.Objects;
* Encoding Stream Callback handles writing PGP messages using configured
properties
*/
public class EncodingStreamCallback implements StreamCallback {
- private static final int OUTPUT_BUFFER_SIZE = 8192;
+ protected static final int OUTPUT_BUFFER_SIZE = 8192;
private final FileEncoding fileEncoding;