Repository: nifi
Updated Branches:
  refs/heads/master b207397a1 -> 39cfd0339


NIFI-3205: This closes #1336. Ensure that FlowFile Repository is updated with 
any Transient Content Claims when session rollback occurs


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/39cfd033
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/39cfd033
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/39cfd033

Branch: refs/heads/master
Commit: 39cfd0339756e3869d0e3f3bcf45569855d6bca3
Parents: b207397
Author: Mark Payne <[email protected]>
Authored: Fri Dec 16 09:01:01 2016 -0500
Committer: joewitt <[email protected]>
Committed: Wed Feb 15 13:15:56 2017 -0500

----------------------------------------------------------------------
 .../repository/RepositoryRecordType.java        |  2 +-
 .../java/org/apache/nifi/util/MockFlowFile.java | 14 ++-
 .../repository/StandardFlowFileRecord.java      |  6 +-
 .../repository/StandardProcessSession.java      | 19 ++++-
 .../TransientClaimRepositoryRecord.java         | 89 ++++++++++++++++++++
 .../WriteAheadFlowFileRepository.java           | 17 +++-
 .../repository/TestStandardProcessSession.java  | 77 +++++++++++++++++
 7 files changed, 213 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/39cfd033/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java
----------------------------------------------------------------------
diff --git 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java
 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java
index ff8dc50..50221bb 100644
--- 
a/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java
+++ 
b/nifi-framework-api/src/main/java/org/apache/nifi/controller/repository/RepositoryRecordType.java
@@ -21,5 +21,5 @@ package org.apache.nifi.controller.repository;
  */
 public enum RepositoryRecordType {
 
-    UPDATE, CREATE, DELETE, CONTENTMISSING, SWAP_IN, SWAP_OUT;
+    UPDATE, CREATE, DELETE, CONTENTMISSING, SWAP_IN, SWAP_OUT, 
CLEANUP_TRANSIENT_CLAIMS;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/39cfd033/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java 
b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
index 9848a3d..1ff1a2f 100644
--- a/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
+++ b/nifi-mock/src/main/java/org/apache/nifi/util/MockFlowFile.java
@@ -33,6 +33,7 @@ import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
 
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.nifi.controller.repository.FlowFileRecord;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
 import org.apache.nifi.flowfile.FlowFile;
@@ -142,7 +143,7 @@ public class MockFlowFile implements FlowFileRecord {
 
     @Override
     public int hashCode() {
-        return (int) id;
+        return new HashCodeBuilder(7, 13).append(id).toHashCode();
     }
 
     @Override
@@ -150,8 +151,11 @@ public class MockFlowFile implements FlowFileRecord {
         if (obj == null) {
             return false;
         }
-        if (obj instanceof MockFlowFile) {
-            return ((MockFlowFile) obj).id == this.id;
+        if (obj == this) {
+            return true;
+        }
+        if (obj instanceof FlowFile) {
+            return ((FlowFile) obj).getId() == this.id;
         }
         return false;
     }
@@ -291,10 +295,12 @@ public class MockFlowFile implements FlowFileRecord {
     public long getQueueDateIndex() {
         return 0;
     }
+
     public boolean isAttributeEqual(final String attributeName, final String 
expectedValue) {
         // unknown attribute name, so cannot be equal.
-        if (attributes.containsKey(attributeName) == false)
+        if (attributes.containsKey(attributeName) == false) {
             return false;
+        }
 
         String value = attributes.get(attributeName);
         return Objects.equals(expectedValue, value);

http://git-wip-us.apache.org/repos/asf/nifi/blob/39cfd033/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
index 607ccfd..088f26d 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardFlowFileRecord.java
@@ -145,11 +145,11 @@ public final class StandardFlowFileRecord implements 
FlowFile, FlowFileRecord {
         if (this == other) {
             return true;
         }
-        if (!(other instanceof StandardFlowFileRecord)) {
+        if (!(other instanceof FlowFile)) {
             return false;
         }
-        final StandardFlowFileRecord otherRecord = (StandardFlowFileRecord) 
other;
-        return new EqualsBuilder().append(id, otherRecord.id).isEquals();
+        final FlowFile otherRecord = (FlowFile) other;
+        return new EqualsBuilder().append(id, otherRecord.getId()).isEquals();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/39cfd033/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index f7dfd73..54987b9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -47,13 +47,13 @@ import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.provenance.ProvenanceReporter;
 import org.apache.nifi.provenance.StandardProvenanceEventRecord;
-import org.apache.nifi.stream.io.BufferedOutputStream;
 import org.apache.nifi.stream.io.ByteCountingInputStream;
 import org.apache.nifi.stream.io.ByteCountingOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.BufferedOutputStream;
 import java.io.ByteArrayInputStream;
 import java.io.EOFException;
 import java.io.IOException;
@@ -973,6 +973,23 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
         }
 
+        // If we have transient claims that need to be cleaned up, do so.
+        final List<ContentClaim> transientClaims = recordsToHandle.stream()
+            .flatMap(record -> record.getTransientClaims().stream())
+            .collect(Collectors.toList());
+
+        if (!transientClaims.isEmpty()) {
+            final RepositoryRecord repoRecord = new 
TransientClaimRepositoryRecord(transientClaims);
+            try {
+                
context.getFlowFileRepository().updateRepository(Collections.singletonList(repoRecord));
+            } catch (final IOException ioe) {
+                LOG.error("Unable to update FlowFile repository to cleanup 
transient claims due to {}", ioe.toString());
+                if (LOG.isDebugEnabled()) {
+                    LOG.error("", ioe);
+                }
+            }
+        }
+
         final Connectable connectable = context.getConnectable();
         final StandardFlowFileEvent flowFileEvent = new 
StandardFlowFileEvent(connectable.getIdentifier());
         flowFileEvent.setBytesRead(bytesRead);

http://git-wip-us.apache.org/repos/asf/nifi/blob/39cfd033/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java
new file mode 100644
index 0000000..8cf6952
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/TransientClaimRepositoryRecord.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.controller.repository;
+
+import java.util.List;
+
+import org.apache.nifi.controller.queue.FlowFileQueue;
+import org.apache.nifi.controller.repository.claim.ContentClaim;
+
+/**
+ * A simple RepositoryRecord that represents a Set of Content Claims that need 
to be cleaned up
+ */
+public class TransientClaimRepositoryRecord implements RepositoryRecord {
+    private final List<ContentClaim> claimsToCleanUp;
+
+    public TransientClaimRepositoryRecord(final List<ContentClaim> 
claimsToCleanUp) {
+        this.claimsToCleanUp = claimsToCleanUp;
+    }
+
+    @Override
+    public FlowFileQueue getDestination() {
+        return null;
+    }
+
+    @Override
+    public FlowFileQueue getOriginalQueue() {
+        return null;
+    }
+
+    @Override
+    public RepositoryRecordType getType() {
+        return RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS;
+    }
+
+    @Override
+    public ContentClaim getCurrentClaim() {
+        return null;
+    }
+
+    @Override
+    public ContentClaim getOriginalClaim() {
+        return null;
+    }
+
+    @Override
+    public long getCurrentClaimOffset() {
+        return 0;
+    }
+
+    @Override
+    public FlowFileRecord getCurrent() {
+        return null;
+    }
+
+    @Override
+    public boolean isAttributesChanged() {
+        return false;
+    }
+
+    @Override
+    public boolean isMarkedForAbort() {
+        return false;
+    }
+
+    @Override
+    public String getSwapLocation() {
+        return null;
+    }
+
+    @Override
+    public List<ContentClaim> getTransientClaims() {
+        return claimsToCleanUp;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/39cfd033/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 071527c..b5807ca 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -21,6 +21,7 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -35,6 +36,7 @@ import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.repository.claim.ContentClaim;
@@ -201,13 +203,24 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
     private void updateRepository(final Collection<RepositoryRecord> records, 
final boolean sync) throws IOException {
         for (final RepositoryRecord record : records) {
-            if (record.getType() != RepositoryRecordType.DELETE && 
record.getType() != RepositoryRecordType.CONTENTMISSING && 
record.getDestination() == null) {
+            if (record.getType() != RepositoryRecordType.DELETE && 
record.getType() != RepositoryRecordType.CONTENTMISSING
+                && record.getType() != 
RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS && record.getDestination() == 
null) {
                 throw new IllegalArgumentException("Record " + record + " has 
no destination and Type is " + record.getType());
             }
         }
 
+        // Partition records by whether or not their type is 
'CLEANUP_TRANSIENT_CLAIMS'. We do this because we don't want to send
+        // these types of records to the Write-Ahead Log.
+        final Map<Boolean, List<RepositoryRecord>> partitionedRecords = 
records.stream()
+            .collect(Collectors.partitioningBy(record -> record.getType() == 
RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS));
+
+        List<RepositoryRecord> recordsForWal = 
partitionedRecords.get(Boolean.FALSE);
+        if (recordsForWal == null) {
+            recordsForWal = Collections.emptyList();
+        }
+
         // update the repository.
-        final int partitionIndex = wal.update(records, sync);
+        final int partitionIndex = wal.update(recordsForWal, sync);
 
         // The below code is not entirely thread-safe, but we are OK with that 
because the results aren't really harmful.
         // Specifically, if two different threads call updateRepository with 
DELETE records for the same Content Claim,

http://git-wip-us.apache.org/repos/asf/nifi/blob/39cfd033/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 6f94994..9070d0c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -81,6 +81,7 @@ import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventRepository;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.stream.io.StreamUtils;
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.NiFiProperties;
 import org.junit.After;
 import org.junit.Assert;
@@ -1488,10 +1489,81 @@ public class TestStandardProcessSession {
         session.commit();
     }
 
+    @Test
+    public void 
testNewFlowFileModifiedMultipleTimesHasTransientClaimsOnCommit() {
+        FlowFile flowFile = session.create();
+        for (int i = 0; i < 5; i++) {
+            final byte[] content = String.valueOf(i).getBytes();
+            flowFile = session.write(flowFile, out -> out.write(content));
+        }
+
+        session.transfer(flowFile, new 
Relationship.Builder().name("success").build());
+        session.commit();
+
+        final List<RepositoryRecord> repoUpdates = flowFileRepo.getUpdates();
+        assertEquals(1, repoUpdates.size());
+
+        // Should be 4 transient claims because it was written to 5 times. So 
4 transient + 1 actual claim.
+        final RepositoryRecord record = repoUpdates.get(0);
+        assertEquals(RepositoryRecordType.CREATE, record.getType());
+        final List<ContentClaim> transientClaims = record.getTransientClaims();
+        assertEquals(4, transientClaims.size());
+    }
+
+
+    @Test
+    public void 
testUpdateFlowFileModifiedMultipleTimesHasTransientClaimsOnCommit() {
+        flowFileQueue.put(new MockFlowFile(1L));
+
+        FlowFile flowFile = session.get();
+        for (int i = 0; i < 5; i++) {
+            final byte[] content = String.valueOf(i).getBytes();
+            flowFile = session.write(flowFile, out -> out.write(content));
+        }
+
+        session.transfer(flowFile, new 
Relationship.Builder().name("success").build());
+        session.commit();
+
+        final List<RepositoryRecord> repoUpdates = flowFileRepo.getUpdates();
+        assertEquals(1, repoUpdates.size());
+
+        // Should be 4 transient claims because it was written to 5 times. So 
4 transient + 1 actual claim.
+        final RepositoryRecord record = repoUpdates.get(0);
+        assertEquals(RepositoryRecordType.UPDATE, record.getType());
+        final List<ContentClaim> transientClaims = record.getTransientClaims();
+        assertEquals(4, transientClaims.size());
+    }
+
+
+    @Test
+    public void 
testUpdateFlowFileModifiedMultipleTimesHasTransientClaimsOnRollback() {
+        flowFileQueue.put(new MockFlowFile(1L));
+
+        FlowFile flowFile = session.get();
+        for (int i = 0; i < 5; i++) {
+            final byte[] content = String.valueOf(i).getBytes();
+            flowFile = session.write(flowFile, out -> out.write(content));
+        }
+
+        session.rollback();
+
+        final List<RepositoryRecord> repoUpdates = flowFileRepo.getUpdates();
+        assertEquals(1, repoUpdates.size());
+
+        // Should be 5 transient claims because it was written to 5 times and 
then rolled back so all
+        // content claims are 'transient'.
+        final RepositoryRecord record = repoUpdates.get(0);
+        assertEquals(RepositoryRecordType.CLEANUP_TRANSIENT_CLAIMS, 
record.getType());
+        final List<ContentClaim> transientClaims = record.getTransientClaims();
+        assertEquals(5, transientClaims.size());
+    }
+
+
     private static class MockFlowFileRepository implements FlowFileRepository {
 
         private boolean failOnUpdate = false;
         private final AtomicLong idGenerator = new AtomicLong(0L);
+        private final List<RepositoryRecord> updates = new ArrayList<>();
 
         public void setFailOnUpdate(final boolean fail) {
             this.failOnUpdate = fail;
@@ -1516,6 +1588,11 @@ public class TestStandardProcessSession {
             if (failOnUpdate) {
                 throw new IOException("FlowFile Repository told to fail on 
update for unit test");
             }
+            updates.addAll(records);
+        }
+
+        public List<RepositoryRecord> getUpdates() {
+            return updates;
         }
 
         @Override

Reply via email to