Repository: nifi
Updated Branches:
  refs/heads/master 22143e386 -> 91ff810db


NIFI-3088: Ensure that on recovery of FlowFile Repository, if we find a 
FlowFile that maps to an unknown queue that we log a warning that the queue is 
missing and drop the FlowFile, rather than throwing an NPE

This closes #1266

Signed-off-by: jpercivall <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/91ff810d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/91ff810d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/91ff810d

Branch: refs/heads/master
Commit: 91ff810dbae71ac9d49bb5bcb0ef6b7c8dac8036
Parents: 22143e3
Author: Mark Payne <[email protected]>
Authored: Wed Nov 23 10:33:06 2016 -0500
Committer: jpercivall <[email protected]>
Committed: Wed Nov 23 11:27:14 2016 -0500

----------------------------------------------------------------------
 .../repository/SchemaRepositoryRecordSerde.java | 20 +++++++++++++++++++-
 .../WriteAheadFlowFileRepository.java           | 10 ++++++++--
 .../schema/RepositoryRecordFieldMap.java        |  2 +-
 .../schema/RepositoryRecordSchema.java          |  1 +
 4 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/91ff810d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
index 916fd76..c0c9d18 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/SchemaRepositoryRecordSerde.java
@@ -38,10 +38,13 @@ import org.apache.nifi.repository.schema.Repetition;
 import org.apache.nifi.repository.schema.SchemaRecordReader;
 import org.apache.nifi.repository.schema.SchemaRecordWriter;
 import org.apache.nifi.repository.schema.SimpleRecordField;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.wali.SerDe;
 import org.wali.UpdateType;
 
 public class SchemaRepositoryRecordSerde extends RepositoryRecordSerde 
implements SerDe<RepositoryRecord> {
+    private static final Logger logger = 
LoggerFactory.getLogger(SchemaRepositoryRecordSerde.class);
     private static final int MAX_ENCODING_VERSION = 1;
 
     private final RecordSchema writeSchema = 
RepositoryRecordSchema.REPOSITORY_RECORD_SCHEMA_V1;
@@ -154,7 +157,19 @@ public class SchemaRepositoryRecordSerde extends 
RepositoryRecordSerde implement
         final String queueId = (String) 
record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
         final FlowFileQueue queue = getFlowFileQueue(queueId);
 
-        return new StandardRepositoryRecord(queue, flowFileRecord);
+        final StandardRepositoryRecord repoRecord = new 
StandardRepositoryRecord(queue, flowFileRecord);
+        requireFlowFileQueue(repoRecord, queueId);
+        return repoRecord;
+    }
+
+    private void requireFlowFileQueue(final StandardRepositoryRecord 
repoRecord, final String queueId) {
+        if (queueId == null || queueId.trim().isEmpty()) {
+            logger.warn("{} does not have a Queue associated with it; this 
record will be discarded", repoRecord.getCurrent());
+            repoRecord.markForAbort();
+        } else if (repoRecord.getOriginalQueue() == null) {
+            logger.warn("{} maps to unknown Queue {}; this record will be 
discarded", repoRecord.getCurrent(), queueId);
+            repoRecord.markForAbort();
+        }
     }
 
     private void populateContentClaim(final StandardFlowFileRecord.Builder 
ffBuilder, final Record record) {
@@ -189,6 +204,9 @@ public class SchemaRepositoryRecordSerde extends 
RepositoryRecordSerde implement
         final StandardRepositoryRecord repoRecord = createRecord(record);
         final String swapLocation = (String) record.getFieldValue(new 
SimpleRecordField(RepositoryRecordSchema.SWAP_LOCATION, FieldType.STRING, 
Repetition.EXACTLY_ONE));
         repoRecord.setSwapLocation(swapLocation);
+
+        final String queueId = (String) 
record.getFieldValue(RepositoryRecordSchema.QUEUE_IDENTIFIER);
+        requireFlowFileQueue(repoRecord, queueId);
         return repoRecord;
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/91ff810d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
index 2a323de..071527c 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/WriteAheadFlowFileRepository.java
@@ -354,6 +354,7 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         }
 
         // Determine the next sequence number for FlowFiles
+        int numFlowFilesMissingQueue = 0;
         long maxId = minimumSequenceNumber;
         for (final RepositoryRecord record : recordList) {
             final long recordId = serdeFactory.getRecordIdentifier(record);
@@ -363,7 +364,9 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
 
             final FlowFileRecord flowFile = record.getCurrent();
             final FlowFileQueue queue = record.getOriginalQueue();
-            if (queue != null) {
+            if (queue == null) {
+                numFlowFilesMissingQueue++;
+            } else {
                 queue.put(flowFile);
             }
         }
@@ -371,7 +374,10 @@ public class WriteAheadFlowFileRepository implements 
FlowFileRepository, SyncLis
         // Set the AtomicLong to 1 more than the max ID so that calls to 
#getNextFlowFileSequence() will
         // return the appropriate number.
         flowFileSequenceGenerator.set(maxId + 1);
-        logger.info("Successfully restored {} FlowFiles", recordList.size());
+        logger.info("Successfully restored {} FlowFiles", recordList.size() - 
numFlowFilesMissingQueue);
+        if (numFlowFilesMissingQueue > 0) {
+            logger.warn("On recovery, found {} FlowFiles whose queue no longer 
exists. These FlowFiles will be dropped.", numFlowFilesMissingQueue);
+        }
 
         final Runnable checkpointRunnable = new Runnable() {
             @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/91ff810d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
index 9804dec..5fe4889 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordFieldMap.java
@@ -65,7 +65,7 @@ public class RepositoryRecordFieldMap implements Record {
                 return contentClaimFieldMap;
             case RepositoryRecordSchema.QUEUE_IDENTIFIER:
                 final FlowFileQueue queue = record.getDestination() == null ? 
record.getOriginalQueue() : record.getDestination();
-                return queue.getIdentifier();
+                return queue == null ? null : queue.getIdentifier();
             default:
                 return null;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/91ff810d/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
index 5887c8a..f99b5d9 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/schema/RepositoryRecordSchema.java
@@ -59,6 +59,7 @@ public class RepositoryRecordSchema {
         final List<RecordField> createOrUpdateFields = new ArrayList<>();
         createOrUpdateFields.add(ACTION_TYPE_FIELD);
         
createOrUpdateFields.addAll(FlowFileSchema.FLOWFILE_SCHEMA_V1.getFields());
+
         createOrUpdateFields.add(new SimpleRecordField(QUEUE_IDENTIFIER, 
FieldType.STRING, Repetition.EXACTLY_ONE));
         createOrUpdateFields.add(new SimpleRecordField(SWAP_LOCATION, 
FieldType.STRING, Repetition.ZERO_OR_ONE));
         final ComplexRecordField createOrUpdate = new 
ComplexRecordField(CREATE_OR_UPDATE_ACTION, Repetition.EXACTLY_ONE, 
createOrUpdateFields);

Reply via email to