This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 181b287b1d NIFI-11670 Refactored Content Repo OutputStream to create 
new Claim per FlowFile
181b287b1d is described below

commit 181b287b1d093fb6a707f5d7af73a582da224a37
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jun 8 09:18:32 2023 -0400

    NIFI-11670 Refactored Content Repo OutputStream to create new Claim per 
FlowFile
    
    Rather than creating many FlowFiles with the same Content Claim, refactored 
content repos' OutputStreams and ClaimWriteCache so that a new ContentClaim is 
created for each FlowFile. This ensures that we have a content claim offset of 
0. The poor performance was due to having to use StreamUtils.skip() in 
conjunction with the CipherInputStream, which would only skip a max of 511 
bytes at a time. By using a separate Content Claim per FlowFile, we no longer 
need to seek after creating the [...]
    
    This closes #7363
    
    Signed-off-by: David Handermann <[email protected]>
    (cherry picked from commit 702c6350344e4ae4cd31349747e96c1384017ed4)
---
 .../repository/io/ContentClaimOutputStream.java    | 37 ++++++++++++++
 .../repository/FileSystemRepository.java           | 19 ++++++--
 .../claim/StandardContentClaimWriteCache.java      | 57 ++++++++++++++++++----
 .../crypto/EncryptedFileSystemRepository.java      | 23 +++++++--
 .../repositories/EncryptedRepoContentAccessIT.java | 33 +++++++++++++
 5 files changed, 152 insertions(+), 17 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimOutputStream.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimOutputStream.java
new file mode 100644
index 0000000000..2df10c97a7
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimOutputStream.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.io;
+
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class ContentClaimOutputStream extends OutputStream {
+
+    /**
+     * Creates a new Content Claim that is backed by this OutputStream. This 
allows the caller to
+     * create a new Content Claim but ensure that they keep writing to the 
same OutputStream, which can
+     * significantly improve performance.
+     *
+     * @return a new ContentClaim
+     * @throws IOException if unable to finalize the current ContentClaim or 
create a new one
+     */
+    abstract public ContentClaim newContentClaim() throws IOException;
+
+}
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index d99c79f116..0e94cdab6d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -21,6 +21,7 @@ import 
org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaim;
 import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
 import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.io.ContentClaimOutputStream;
 import org.apache.nifi.controller.repository.io.LimitedInputStream;
 import org.apache.nifi.engine.FlowEngine;
 import org.apache.nifi.events.EventReporter;
@@ -1820,13 +1821,13 @@ public class FileSystemRepository implements 
ContentRepository {
 
 
 
-    protected class ContentRepositoryOutputStream extends OutputStream {
-        protected final StandardContentClaim scc;
+    protected class ContentRepositoryOutputStream extends 
ContentClaimOutputStream {
+        protected StandardContentClaim scc;
 
         protected final ByteCountingOutputStream bcos;
 
-        protected final int initialLength;
-        protected long bytesWritten;
+        protected int initialLength;
+        private long bytesWritten;
         protected boolean recycle;
         protected boolean closed;
 
@@ -1965,5 +1966,15 @@ public class FileSystemRepository implements 
ContentRepository {
                 }
             }
         }
+
+
+        @Override
+        public synchronized ContentClaim newContentClaim() throws IOException {
+            scc = new StandardContentClaim(scc.getResourceClaim(), 
scc.getOffset() + scc.getLength());
+            initialLength = 0;
+            bytesWritten = 0L;
+            incrementClaimaintCount(scc);
+            return scc;
+        }
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java
index 27cdfac7ff..1a728071c2 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.controller.repository.claim;
 
+import org.apache.nifi.controller.repository.io.ContentClaimOutputStream;
 import org.apache.nifi.controller.repository.ContentRepository;
 import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
 import 
org.apache.nifi.controller.repository.metrics.PerformanceTrackingOutputStream;
@@ -31,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 public class StandardContentClaimWriteCache implements ContentClaimWriteCache {
     private final ContentRepository contentRepo;
-    private final Map<ResourceClaim, OutputStream> streamMap = new 
ConcurrentHashMap<>();
+    private final Map<ResourceClaim, MappedOutputStream> streamMap = new 
ConcurrentHashMap<>();
     private final Queue<ContentClaim> queue = new LinkedList<>();
     private final PerformanceTracker performanceTracker;
     private final int bufferSize;
@@ -60,8 +61,15 @@ public class StandardContentClaimWriteCache implements 
ContentClaimWriteCache {
     public ContentClaim getContentClaim() throws IOException {
         final ContentClaim contentClaim = queue.poll();
         if (contentClaim != null) {
-            contentRepo.incrementClaimaintCount(contentClaim);
-            return contentClaim;
+            flush(contentClaim);
+
+            final MappedOutputStream mappedOutputStream = 
streamMap.get(contentClaim.getResourceClaim());
+            if (mappedOutputStream != null) {
+                final OutputStream contentRepoStream = 
mappedOutputStream.getContentRepoStream();
+                if (contentRepoStream instanceof ContentClaimOutputStream) {
+                    return ((ContentClaimOutputStream) 
contentRepoStream).newContentClaim();
+                }
+            }
         }
 
         final ContentClaim claim = contentRepo.create(false);
@@ -73,13 +81,24 @@ public class StandardContentClaimWriteCache implements 
ContentClaimWriteCache {
         final OutputStream out = contentRepo.write(contentClaim);
         final OutputStream performanceTrackingOut = new 
PerformanceTrackingOutputStream(out, performanceTracker);
         final OutputStream buffered = new 
BufferedOutputStream(performanceTrackingOut, bufferSize);
-        streamMap.put(contentClaim.getResourceClaim(), buffered);
+
+        final MappedOutputStream mappedOutputStream = new 
MappedOutputStream(out, buffered);
+        streamMap.put(contentClaim.getResourceClaim(), mappedOutputStream);
         return buffered;
     }
 
+    private OutputStream getWritableStream(final ResourceClaim claim) {
+        final MappedOutputStream mappedOutputStream = streamMap.get(claim);
+        if (mappedOutputStream == null) {
+            return null;
+        }
+
+        return mappedOutputStream.getBufferedStream();
+    }
+
     @Override
     public OutputStream write(final ContentClaim claim) throws IOException {
-        OutputStream out = streamMap.get(claim.getResourceClaim());
+        OutputStream out = getWritableStream(claim.getResourceClaim());
         if (out == null) {
             out = registerStream(claim);
         }
@@ -146,9 +165,9 @@ public class StandardContentClaimWriteCache implements 
ContentClaimWriteCache {
 
     @Override
     public void flush(final ResourceClaim claim) throws IOException {
-        final OutputStream out = streamMap.get(claim);
-        if (out != null) {
-            out.flush();
+        final MappedOutputStream mapped = streamMap.get(claim);
+        if (mapped != null) {
+            mapped.getBufferedStream().flush();
         }
     }
 
@@ -160,9 +179,9 @@ public class StandardContentClaimWriteCache implements 
ContentClaimWriteCache {
     private void forEachStream(final StreamProcessor proc) throws IOException {
         IOException exception = null;
 
-        for (final OutputStream out : streamMap.values()) {
+        for (final MappedOutputStream mapped : streamMap.values()) {
             try {
-                proc.process(out);
+                proc.process(mapped.getBufferedStream());
             } catch (final IOException ioe) {
                 if (exception == null) {
                     exception = ioe;
@@ -181,4 +200,22 @@ public class StandardContentClaimWriteCache implements 
ContentClaimWriteCache {
     private interface StreamProcessor {
         void process(final OutputStream out) throws IOException;
     }
+
+    private static class MappedOutputStream {
+        private final OutputStream contentRepoStream;
+        private final OutputStream bufferedStream;
+
+        public MappedOutputStream(final OutputStream contentRepoStream, final 
OutputStream bufferedStream) {
+            this.contentRepoStream = contentRepoStream;
+            this.bufferedStream = bufferedStream;
+        }
+
+        public OutputStream getContentRepoStream() {
+            return contentRepoStream;
+        }
+
+        public OutputStream getBufferedStream() {
+            return bufferedStream;
+        }
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
index 897b9fa147..63ea0d4abf 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
@@ -225,8 +225,8 @@ public class EncryptedFileSystemRepository extends 
FileSystemRepository {
      * to handle streaming encryption operations.
      */
     private class EncryptedContentRepositoryOutputStream extends 
ContentRepositoryOutputStream {
-        private final CipherOutputStream cipherOutputStream;
-        private final long startingOffset;
+        private CipherOutputStream cipherOutputStream;
+        private long startingOffset;
 
         EncryptedContentRepositoryOutputStream(final StandardContentClaim scc,
                                                final ByteCountingOutputStream 
byteCountingOutputStream,
@@ -296,14 +296,31 @@ public class EncryptedFileSystemRepository extends 
FileSystemRepository {
         public synchronized void close() throws IOException {
             closed = true;
 
+            doFinal();
+            super.close();
+        }
+
+        private void doFinal() throws IOException {
             // Always flush and close (close triggers cipher.doFinal())
             cipherOutputStream.flush();
             cipherOutputStream.close();
 
             // Add the additional bytes written to the scc.length
             scc.setLength(bcos.getBytesWritten() - startingOffset);
+        }
 
-            super.close();
+        @Override
+        public synchronized ContentClaim newContentClaim() throws IOException {
+            doFinal();
+
+            startingOffset = bcos.getBytesWritten();
+            scc = new StandardContentClaim(scc.getResourceClaim(), 
startingOffset);
+
+            final String newRecordId = getRecordId(scc);
+            this.cipherOutputStream = (CipherOutputStream) 
repositoryEncryptor.encrypt(new NonCloseableOutputStream(bcos), newRecordId, 
keyId);
+
+            incrementClaimaintCount(scc);
+            return scc;
         }
     }
 }
diff --git 
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/EncryptedRepoContentAccessIT.java
 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/EncryptedRepoContentAccessIT.java
new file mode 100644
index 0000000000..2cfdb44473
--- /dev/null
+++ 
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/EncryptedRepoContentAccessIT.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.repositories;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EncryptedRepoContentAccessIT extends ContentAccessIT {
+    @Override
+    protected Map<String, String> getNifiPropertiesOverrides() {
+        final Map<String, String> encryptedRepoProperties = new HashMap<>();
+        encryptedRepoProperties.put("nifi.content.repository.implementation", 
"org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository");
+        encryptedRepoProperties.put("nifi.content.repository.encryption.key", 
"0123456789ABCDEFFEDCBA9876543210");
+        
encryptedRepoProperties.put("nifi.content.repository.encryption.key.id", "k1");
+        
encryptedRepoProperties.put("nifi.content.repository.encryption.key.provider.implementation",
 "StaticKeyProvider");
+        return encryptedRepoProperties;
+    }
+}

Reply via email to