[
https://issues.apache.org/jira/browse/NIFI-3356?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15866257#comment-15866257
]
ASF GitHub Bot commented on NIFI-3356:
--------------------------------------
Github user olegz commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1493#discussion_r101099586
--- Diff:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/claim/ContentClaimWriteCache.java
---
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository.claim;
+
+import java.io.BufferedOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+
+import org.apache.nifi.controller.repository.ContentRepository;
+import org.apache.nifi.stream.io.ByteCountingOutputStream;
+
+public class ContentClaimWriteCache {
+ private final ContentRepository contentRepo;
+ private final Map<ResourceClaim, ByteCountingOutputStream> streamMap =
new HashMap<>();
+ private final Queue<ContentClaim> queue = new LinkedList<>();
+ private final int bufferSize;
+
+ public ContentClaimWriteCache(final ContentRepository contentRepo) {
+ this(contentRepo, 8192);
+ }
+
+ public ContentClaimWriteCache(final ContentRepository contentRepo,
final int bufferSize) {
+ this.contentRepo = contentRepo;
+ this.bufferSize = bufferSize;
+ }
+
+ public void reset() throws IOException {
+ try {
+ forEachStream(OutputStream::close);
+ } finally {
+ streamMap.clear();
+ queue.clear();
+ }
+ }
+
+ public ContentClaim getContentClaim() throws IOException {
+ final ContentClaim contentClaim = queue.poll();
+ if (contentClaim != null) {
+ contentRepo.incrementClaimaintCount(contentClaim);
+ return contentClaim;
+ }
+
+ final ContentClaim claim = contentRepo.create(false);
+ registerStream(claim);
+ return claim;
+ }
+
+ private ByteCountingOutputStream registerStream(final ContentClaim
contentClaim) throws IOException {
+ final OutputStream out = contentRepo.write(contentClaim);
+ final OutputStream buffered = new BufferedOutputStream(out,
bufferSize);
+ final ByteCountingOutputStream bcos = new
ByteCountingOutputStream(buffered);
+ streamMap.put(contentClaim.getResourceClaim(), bcos);
+ return bcos;
+ }
+
+ public OutputStream write(final ContentClaim claim) throws IOException
{
+ OutputStream out = streamMap.get(claim.getResourceClaim());
+ if (out == null) {
+ out = registerStream(claim);
+ }
+
+ if (!(claim instanceof StandardContentClaim)) {
+ // we know that we will only create Content Claims that are of
type StandardContentClaim, so if we get anything
+ // else, just throw an Exception because it is not valid for
this Repository
+ throw new IllegalArgumentException("Cannot write to " + claim
+ " because that Content Claim does belong to this Claim Cache");
+ }
+
+ final StandardContentClaim scc = (StandardContentClaim) claim;
+ final long initialLength = Math.max(0L, scc.getLength());
+
+ final OutputStream bcos = out;
+ return new OutputStream() {
+ private long bytesWritten = 0L;
+
+ @Override
+ public void write(final int b) throws IOException {
+ bcos.write(b);
+ bytesWritten++;
+ scc.setLength(initialLength + bytesWritten);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws
IOException {
+ bcos.write(b, off, len);
+ bytesWritten += len;
+ scc.setLength(initialLength + bytesWritten);
+ }
+
--- End diff --
It is correct once I make this change
```
private long bytesWritten = Math.max(0L, scc.getLength());
```
And there is an existing test to prove it. Curious though why would
scc.length ever be < 0?
> Provide a newly refactored provenance repository
> ------------------------------------------------
>
> Key: NIFI-3356
> URL: https://issues.apache.org/jira/browse/NIFI-3356
> Project: Apache NiFi
> Issue Type: Task
> Components: Core Framework
> Reporter: Mark Payne
> Assignee: Mark Payne
> Fix For: 1.2.0
>
>
> The Persistent Provenance Repository has been redesigned a few different
> times over several years. The original design for the repository was to
> provide storage of events and sequential iteration over those events via a
> Reporting Task. After that, we added the ability to compress the data so that
> it could be held longer. We then introduced the notion of indexing and
> searching via Lucene. We've since made several more modifications to try to
> boost performance.
> At this point, however, the repository is still the bottleneck for many flows
> that handle large volumes of small FlowFiles. We need a new implementation
> that is based around the current goals for the repository and that can
> provide better throughput.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)