This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 702c635034 NIFI-11670 Refactored Content Repo OutputStream to create
new Claim per FlowFile
702c635034 is described below
commit 702c6350344e4ae4cd31349747e96c1384017ed4
Author: Mark Payne <[email protected]>
AuthorDate: Thu Jun 8 09:18:32 2023 -0400
NIFI-11670 Refactored Content Repo OutputStream to create new Claim per
FlowFile
Rather than creating many FlowFiles with the same Content Claim, refactored
content repos' OutputStreams and ClaimWriteCache so that a new ContentClaim is
created for each FlowFile. This ensures that we have a content claim offset of
0. The poor performance was due to having to use StreamUtils.skip() in
conjunction with the CipherInputStream, which would only skip a max of 511
bytes at a time. By using a separate Content Claim per FlowFile, we no longer
need to seek after creating the [...]
This closes #7363
Signed-off-by: David Handermann <[email protected]>
---
.../repository/io/ContentClaimOutputStream.java | 37 ++++++++++++++
.../repository/FileSystemRepository.java | 19 ++++++--
.../claim/StandardContentClaimWriteCache.java | 57 ++++++++++++++++++----
.../crypto/EncryptedFileSystemRepository.java | 23 +++++++--
.../repositories/EncryptedRepoContentAccessIT.java | 33 +++++++++++++
5 files changed, 152 insertions(+), 17 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimOutputStream.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimOutputStream.java
new file mode 100644
index 0000000000..2df10c97a7
--- /dev/null
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-components/src/main/java/org/apache/nifi/controller/repository/io/ContentClaimOutputStream.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.io;
+
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public abstract class ContentClaimOutputStream extends OutputStream {
+
+ /**
+ * Creates a new Content Claim that is backed by this OutputStream. This
allows the caller to
+ * create a new Content Claim but ensure that they keep writing to the
same OutputStream, which can
+ * significantly improve performance.
+ *
+ * @return a new ContentClaim
+ * @throws IOException if unable to finalize the current ContentClaim or
create a new one
+ */
+ abstract public ContentClaim newContentClaim() throws IOException;
+
+}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
index f6118334ed..f151f21de3 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/FileSystemRepository.java
@@ -21,6 +21,7 @@ import
org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
+import org.apache.nifi.controller.repository.io.ContentClaimOutputStream;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.events.EventReporter;
@@ -1820,13 +1821,13 @@ public class FileSystemRepository implements
ContentRepository {
- protected class ContentRepositoryOutputStream extends OutputStream {
- protected final StandardContentClaim scc;
+ protected class ContentRepositoryOutputStream extends
ContentClaimOutputStream {
+ protected StandardContentClaim scc;
protected final ByteCountingOutputStream bcos;
- protected final int initialLength;
- protected long bytesWritten;
+ protected int initialLength;
+ private long bytesWritten;
protected boolean recycle;
protected boolean closed;
@@ -1965,5 +1966,15 @@ public class FileSystemRepository implements
ContentRepository {
}
}
}
+
+
+ @Override
+ public synchronized ContentClaim newContentClaim() throws IOException {
+ scc = new StandardContentClaim(scc.getResourceClaim(),
scc.getOffset() + scc.getLength());
+ initialLength = 0;
+ bytesWritten = 0L;
+ incrementClaimaintCount(scc);
+ return scc;
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java
index 27cdfac7ff..1a728071c2 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/StandardContentClaimWriteCache.java
@@ -17,6 +17,7 @@
package org.apache.nifi.controller.repository.claim;
+import org.apache.nifi.controller.repository.io.ContentClaimOutputStream;
import org.apache.nifi.controller.repository.ContentRepository;
import org.apache.nifi.controller.repository.metrics.PerformanceTracker;
import
org.apache.nifi.controller.repository.metrics.PerformanceTrackingOutputStream;
@@ -31,7 +32,7 @@ import java.util.concurrent.ConcurrentHashMap;
public class StandardContentClaimWriteCache implements ContentClaimWriteCache {
private final ContentRepository contentRepo;
- private final Map<ResourceClaim, OutputStream> streamMap = new
ConcurrentHashMap<>();
+ private final Map<ResourceClaim, MappedOutputStream> streamMap = new
ConcurrentHashMap<>();
private final Queue<ContentClaim> queue = new LinkedList<>();
private final PerformanceTracker performanceTracker;
private final int bufferSize;
@@ -60,8 +61,15 @@ public class StandardContentClaimWriteCache implements
ContentClaimWriteCache {
public ContentClaim getContentClaim() throws IOException {
final ContentClaim contentClaim = queue.poll();
if (contentClaim != null) {
- contentRepo.incrementClaimaintCount(contentClaim);
- return contentClaim;
+ flush(contentClaim);
+
+ final MappedOutputStream mappedOutputStream =
streamMap.get(contentClaim.getResourceClaim());
+ if (mappedOutputStream != null) {
+ final OutputStream contentRepoStream =
mappedOutputStream.getContentRepoStream();
+ if (contentRepoStream instanceof ContentClaimOutputStream) {
+ return ((ContentClaimOutputStream)
contentRepoStream).newContentClaim();
+ }
+ }
}
final ContentClaim claim = contentRepo.create(false);
@@ -73,13 +81,24 @@ public class StandardContentClaimWriteCache implements
ContentClaimWriteCache {
final OutputStream out = contentRepo.write(contentClaim);
final OutputStream performanceTrackingOut = new
PerformanceTrackingOutputStream(out, performanceTracker);
final OutputStream buffered = new
BufferedOutputStream(performanceTrackingOut, bufferSize);
- streamMap.put(contentClaim.getResourceClaim(), buffered);
+
+ final MappedOutputStream mappedOutputStream = new
MappedOutputStream(out, buffered);
+ streamMap.put(contentClaim.getResourceClaim(), mappedOutputStream);
return buffered;
}
+ private OutputStream getWritableStream(final ResourceClaim claim) {
+ final MappedOutputStream mappedOutputStream = streamMap.get(claim);
+ if (mappedOutputStream == null) {
+ return null;
+ }
+
+ return mappedOutputStream.getBufferedStream();
+ }
+
@Override
public OutputStream write(final ContentClaim claim) throws IOException {
- OutputStream out = streamMap.get(claim.getResourceClaim());
+ OutputStream out = getWritableStream(claim.getResourceClaim());
if (out == null) {
out = registerStream(claim);
}
@@ -146,9 +165,9 @@ public class StandardContentClaimWriteCache implements
ContentClaimWriteCache {
@Override
public void flush(final ResourceClaim claim) throws IOException {
- final OutputStream out = streamMap.get(claim);
- if (out != null) {
- out.flush();
+ final MappedOutputStream mapped = streamMap.get(claim);
+ if (mapped != null) {
+ mapped.getBufferedStream().flush();
}
}
@@ -160,9 +179,9 @@ public class StandardContentClaimWriteCache implements
ContentClaimWriteCache {
private void forEachStream(final StreamProcessor proc) throws IOException {
IOException exception = null;
- for (final OutputStream out : streamMap.values()) {
+ for (final MappedOutputStream mapped : streamMap.values()) {
try {
- proc.process(out);
+ proc.process(mapped.getBufferedStream());
} catch (final IOException ioe) {
if (exception == null) {
exception = ioe;
@@ -181,4 +200,22 @@ public class StandardContentClaimWriteCache implements
ContentClaimWriteCache {
private interface StreamProcessor {
void process(final OutputStream out) throws IOException;
}
+
+ private static class MappedOutputStream {
+ private final OutputStream contentRepoStream;
+ private final OutputStream bufferedStream;
+
+ public MappedOutputStream(final OutputStream contentRepoStream, final
OutputStream bufferedStream) {
+ this.contentRepoStream = contentRepoStream;
+ this.bufferedStream = bufferedStream;
+ }
+
+ public OutputStream getContentRepoStream() {
+ return contentRepoStream;
+ }
+
+ public OutputStream getBufferedStream() {
+ return bufferedStream;
+ }
+ }
}
diff --git
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
index 897b9fa147..63ea0d4abf 100644
---
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
+++
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/crypto/EncryptedFileSystemRepository.java
@@ -225,8 +225,8 @@ public class EncryptedFileSystemRepository extends
FileSystemRepository {
* to handle streaming encryption operations.
*/
private class EncryptedContentRepositoryOutputStream extends
ContentRepositoryOutputStream {
- private final CipherOutputStream cipherOutputStream;
- private final long startingOffset;
+ private CipherOutputStream cipherOutputStream;
+ private long startingOffset;
EncryptedContentRepositoryOutputStream(final StandardContentClaim scc,
final ByteCountingOutputStream
byteCountingOutputStream,
@@ -296,14 +296,31 @@ public class EncryptedFileSystemRepository extends
FileSystemRepository {
public synchronized void close() throws IOException {
closed = true;
+ doFinal();
+ super.close();
+ }
+
+ private void doFinal() throws IOException {
// Always flush and close (close triggers cipher.doFinal())
cipherOutputStream.flush();
cipherOutputStream.close();
// Add the additional bytes written to the scc.length
scc.setLength(bcos.getBytesWritten() - startingOffset);
+ }
- super.close();
+ @Override
+ public synchronized ContentClaim newContentClaim() throws IOException {
+ doFinal();
+
+ startingOffset = bcos.getBytesWritten();
+ scc = new StandardContentClaim(scc.getResourceClaim(),
startingOffset);
+
+ final String newRecordId = getRecordId(scc);
+ this.cipherOutputStream = (CipherOutputStream)
repositoryEncryptor.encrypt(new NonCloseableOutputStream(bcos), newRecordId,
keyId);
+
+ incrementClaimaintCount(scc);
+ return scc;
}
}
}
diff --git
a/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/EncryptedRepoContentAccessIT.java
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/EncryptedRepoContentAccessIT.java
new file mode 100644
index 0000000000..2cfdb44473
--- /dev/null
+++
b/nifi-system-tests/nifi-system-test-suite/src/test/java/org/apache/nifi/tests/system/repositories/EncryptedRepoContentAccessIT.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.tests.system.repositories;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class EncryptedRepoContentAccessIT extends ContentAccessIT {
+ @Override
+ protected Map<String, String> getNifiPropertiesOverrides() {
+ final Map<String, String> encryptedRepoProperties = new HashMap<>();
+ encryptedRepoProperties.put("nifi.content.repository.implementation",
"org.apache.nifi.controller.repository.crypto.EncryptedFileSystemRepository");
+ encryptedRepoProperties.put("nifi.content.repository.encryption.key",
"0123456789ABCDEFFEDCBA9876543210");
+
encryptedRepoProperties.put("nifi.content.repository.encryption.key.id", "k1");
+
encryptedRepoProperties.put("nifi.content.repository.encryption.key.provider.implementation",
"StaticKeyProvider");
+ return encryptedRepoProperties;
+ }
+}