This is an automated email from the ASF dual-hosted git repository.

greyp pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new bc5f00a667 NIFI-11437 Switched to StreamUtils.fillBuffer() for buffer, 
Improved EncryptContentPGP Content Type Detection
bc5f00a667 is described below

commit bc5f00a6671c0f3fd9d6c8599d196f414ac47fd9
Author: exceptionfactory <[email protected]>
AuthorDate: Wed Apr 12 13:00:40 2023 -0500

    NIFI-11437 Switched to StreamUtils.fillBuffer() for buffer, Improved 
EncryptContentPGP Content Type Detection
    
    This closes #7166
    Signed-off-by: Paul Grey <[email protected]>
---
 .../nifi/processors/pgp/EncryptContentPGP.java     | 85 +++++++++++-----------
 .../processors/pgp/io/EncodingStreamCallback.java  |  2 +-
 2 files changed, 44 insertions(+), 43 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java
 
b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java
index a45d90d1f0..9b676fd830 100644
--- 
a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java
+++ 
b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/EncryptContentPGP.java
@@ -29,12 +29,12 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.pgp.service.api.PGPPublicKeyService;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.io.StreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.pgp.attributes.CompressionAlgorithm;
@@ -57,9 +57,11 @@ import 
org.bouncycastle.openpgp.operator.bc.BcPGPDataEncryptorBuilder;
 import 
org.bouncycastle.openpgp.operator.bc.BcPublicKeyKeyEncryptionMethodGenerator;
 import 
org.bouncycastle.openpgp.operator.jcajce.JcePBEKeyEncryptionMethodGenerator;
 
+import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.io.PushbackInputStream;
 import java.security.SecureRandom;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -198,14 +200,10 @@ public class EncryptContentPGP extends AbstractProcessor {
         }
 
         try {
-            final PacketReadInputStreamCallback packetCallback = new 
PacketReadInputStreamCallback();
-            session.read(flowFile, packetCallback);
-
             final SymmetricKeyAlgorithm symmetricKeyAlgorithm = 
getSymmetricKeyAlgorithm(context);
             final FileEncoding fileEncoding = getFileEncoding(context);
             final CompressionAlgorithm compressionAlgorithm = 
getCompressionAlgorithm(context);
-            final StreamCallback callback = getEncryptStreamCallback(context, 
flowFile, symmetricKeyAlgorithm,
-                    compressionAlgorithm, fileEncoding, 
packetCallback.packetFound);
+            final StreamCallback callback = getEncryptStreamCallback(context, 
flowFile, symmetricKeyAlgorithm, compressionAlgorithm, fileEncoding);
             flowFile = session.write(flowFile, callback);
 
             final Map<String, String> attributes = 
getAttributes(symmetricKeyAlgorithm, fileEncoding, compressionAlgorithm);
@@ -267,8 +265,7 @@ public class EncryptContentPGP extends AbstractProcessor {
                                                     final FlowFile flowFile,
                                                     final 
SymmetricKeyAlgorithm symmetricKeyAlgorithm,
                                                     final CompressionAlgorithm 
compressionAlgorithm,
-                                                    final FileEncoding 
fileEncoding,
-                                                    final boolean packetFound) 
{
+                                                    final FileEncoding 
fileEncoding) {
         final SecureRandom secureRandom = new SecureRandom();
         final PGPDataEncryptorBuilder dataEncryptorBuilder = new 
BcPGPDataEncryptorBuilder(symmetricKeyAlgorithm.getId())
                 .setSecureRandom(secureRandom)
@@ -278,7 +275,7 @@ public class EncryptContentPGP extends AbstractProcessor {
         methodGenerators.forEach(encryptedDataGenerator::addMethod);
 
         final String filename = 
flowFile.getAttribute(CoreAttributes.FILENAME.key());
-        return new EncryptStreamCallback(fileEncoding, compressionAlgorithm, 
filename, packetFound, encryptedDataGenerator);
+        return new EncryptStreamCallback(fileEncoding, compressionAlgorithm, 
filename, getLogger(), encryptedDataGenerator);
     }
 
     private List<PGPKeyEncryptionMethodGenerator> 
getEncryptionMethodGenerators(final ProcessContext context,
@@ -338,43 +335,18 @@ public class EncryptContentPGP extends AbstractProcessor {
         return attributes;
     }
 
-    private class PacketReadInputStreamCallback implements InputStreamCallback 
{
-        private boolean packetFound;
-
-        /**
-         * Process Input Stream and attempt to read OpenPGP Packet for content 
detection
-         *
-         * @param inputStream Input Stream to be read
-         */
-        @Override
-        public void process(final InputStream inputStream) {
-            try {
-                final InputStream decodedInputStream = 
PGPUtil.getDecoderStream(inputStream);
-                final BCPGInputStream packetInputStream = new 
BCPGInputStream(decodedInputStream);
-                final Packet packet = packetInputStream.readPacket();
-                if (packet == null) {
-                    getLogger().debug("PGP Packet not found");
-                } else {
-                    packetFound = true;
-                }
-            } catch (final IOException e) {
-                getLogger().debug("PGP Packet read failed", e);
-            }
-        }
-    }
-
     private static class EncryptStreamCallback extends EncodingStreamCallback {
-        private final boolean packetFound;
-
         private final PGPEncryptedDataGenerator encryptedDataGenerator;
 
+        private final ComponentLog logger;
+
         public EncryptStreamCallback(final FileEncoding fileEncoding,
                                      final CompressionAlgorithm 
compressionAlgorithm,
                                      final String filename,
-                                     final boolean packetFound,
+                                     final ComponentLog logger,
                                      final PGPEncryptedDataGenerator 
encryptedDataGenerator) {
             super(fileEncoding, compressionAlgorithm, filename);
-            this.packetFound = packetFound;
+            this.logger = logger;
             this.encryptedDataGenerator = encryptedDataGenerator;
         }
 
@@ -388,15 +360,44 @@ public class EncryptContentPGP extends AbstractProcessor {
          */
         @Override
         protected void processEncoding(final InputStream inputStream, final 
OutputStream encodingOutputStream) throws IOException, PGPException {
-            try (final OutputStream encryptedOutputStream = 
encryptedDataGenerator.open(encodingOutputStream, createOutputBuffer())) {
-                if (packetFound) {
+            try (
+                    final PushbackInputStream pushbackInputStream = new 
PushbackInputStream(inputStream, OUTPUT_BUFFER_SIZE);
+                    final OutputStream encryptedOutputStream = 
encryptedDataGenerator.open(encodingOutputStream, createOutputBuffer())
+            ) {
+                if (isPacketFound(pushbackInputStream)) {
                     // Write OpenPGP packets to encrypted stream without 
additional encoding
-                    StreamUtils.copy(inputStream, encryptedOutputStream);
+                    StreamUtils.copy(pushbackInputStream, 
encryptedOutputStream);
                 } else {
-                    super.processEncoding(inputStream, encryptedOutputStream);
+                    super.processEncoding(pushbackInputStream, 
encryptedOutputStream);
                 }
             }
             encryptedDataGenerator.close();
         }
+
+        private boolean isPacketFound(final PushbackInputStream 
pushbackInputStream) throws IOException {
+            boolean packetFound = false;
+
+            final byte[] buffer = new byte[OUTPUT_BUFFER_SIZE];
+            final int bytesRead = StreamUtils.fillBuffer(pushbackInputStream, 
buffer, false);
+            logger.debug("PGP Packet search read buffer bytes [{}]", 
bytesRead);
+            try (
+                    final ByteArrayInputStream byteArrayInputStream = new 
ByteArrayInputStream(buffer);
+                    final InputStream decodedInputStream = 
PGPUtil.getDecoderStream(byteArrayInputStream);
+                    final BCPGInputStream packetInputStream = new 
BCPGInputStream(decodedInputStream)
+            ) {
+                final Packet packet = packetInputStream.readPacket();
+                if (packet == null) {
+                    logger.debug("PGP Packet not found");
+                } else {
+                    packetFound = true;
+                }
+            } catch (final Exception e) {
+                logger.debug("PGP Packet read failed", e);
+            } finally {
+                pushbackInputStream.unread(buffer, 0, bytesRead);
+            }
+
+            return packetFound;
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java
 
b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java
index 1377e4e289..68ef8e88cc 100644
--- 
a/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java
+++ 
b/nifi-nar-bundles/nifi-pgp-bundle/nifi-pgp-processors/src/main/java/org/apache/nifi/processors/pgp/io/EncodingStreamCallback.java
@@ -37,7 +37,7 @@ import java.util.Objects;
  * Encoding Stream Callback handles writing PGP messages using configured 
properties
  */
 public class EncodingStreamCallback implements StreamCallback {
-    private static final int OUTPUT_BUFFER_SIZE = 8192;
+    protected static final int OUTPUT_BUFFER_SIZE = 8192;
 
     private final FileEncoding fileEncoding;
 

Reply via email to