Repository: incubator-nifi
Updated Branches:
  refs/heads/develop fb75a4ba6 -> efc862eac


NIFI-647: When FORK Event emitted by processor, Framework generates CREATE 
events for each child in addition to the FORK event; this commit fixes that bug


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/9fbd88f0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/9fbd88f0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/9fbd88f0

Branch: refs/heads/develop
Commit: 9fbd88f034e6d5006b0f9cc7c4ffcd09c7c65a01
Parents: 310ae3e
Author: Mark Payne <[email protected]>
Authored: Thu Jun 4 12:42:56 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Thu Jun 4 12:42:56 2015 -0400

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 173 ++++++++++---------
 1 file changed, 88 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/9fbd88f0/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 2b05b47..a048d21 100644
--- 
a/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -93,7 +93,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     // determines how many things must be transferred, removed, modified in 
order to avoid logging the FlowFile ID's on commit/rollback
     public static final int VERBOSE_LOG_THRESHOLD = 10;
     private static final long MAX_APPENDABLE_CLAIM_SIZE = 
DataUnit.parseDataSize(
-            NiFiProperties.getInstance().getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
+        NiFiProperties.getInstance().getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
     private static final int MAX_FLOWFILES_PER_CLAIM = 
NiFiProperties.getInstance().getMaxFlowFilesPerClaim();
 
     public static final String DEFAULT_FLOWFILE_PATH = "./";
@@ -154,28 +154,28 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         String description = connectable.toString();
         switch (connectable.getConnectableType()) {
-            case PROCESSOR:
-                final ProcessorNode procNode = (ProcessorNode) connectable;
-                componentType = 
procNode.getProcessor().getClass().getSimpleName();
-                description = procNode.getProcessor().toString();
-                break;
-            case INPUT_PORT:
-                componentType = "Input Port";
-                break;
-            case OUTPUT_PORT:
-                componentType = "Output Port";
-                break;
-            case REMOTE_INPUT_PORT:
-                componentType = "Remote Input Port";
-                break;
-            case REMOTE_OUTPUT_PORT:
-                componentType = "Remote Output Port";
-                break;
-            case FUNNEL:
-                componentType = "Funnel";
-                break;
-            default:
-                throw new AssertionError("Connectable type is " + 
connectable.getConnectableType());
+        case PROCESSOR:
+            final ProcessorNode procNode = (ProcessorNode) connectable;
+            componentType = procNode.getProcessor().getClass().getSimpleName();
+            description = procNode.getProcessor().toString();
+            break;
+        case INPUT_PORT:
+            componentType = "Input Port";
+            break;
+        case OUTPUT_PORT:
+            componentType = "Output Port";
+            break;
+        case REMOTE_INPUT_PORT:
+            componentType = "Remote Input Port";
+            break;
+        case REMOTE_OUTPUT_PORT:
+            componentType = "Remote Output Port";
+            break;
+        case FUNNEL:
+            componentType = "Funnel";
+            break;
+        default:
+            throw new AssertionError("Connectable type is " + 
connectable.getConnectableType());
         }
 
         this.provenanceReporter = new 
StandardProvenanceReporter(connectable.getIdentifier(), componentType, 
context.getProvenanceRepository(), this);
@@ -437,16 +437,16 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
 
             switch (event.getEventType()) {
-                case SEND:
-                    flowFilesSent++;
-                    bytesSent += event.getFileSize();
-                    break;
-                case RECEIVE:
-                    flowFilesReceived++;
-                    bytesReceived += event.getFileSize();
-                    break;
-                default:
-                    break;
+            case SEND:
+                flowFilesSent++;
+                bytesSent += event.getFileSize();
+                break;
+            case RECEIVE:
+                flowFilesReceived++;
+                bytesReceived += event.getFileSize();
+                break;
+            default:
+                break;
             }
         }
 
@@ -519,9 +519,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             updateEventContentClaims(builder, flowFile, 
checkpoint.records.get(flowFile));
             final ProvenanceEventRecord event = builder.build();
 
-            if (!event.getChildUuids().isEmpty() && 
!isSpuriousForkEvent(event, checkpoint.removedFlowFiles) && 
!processorGenerated.contains(event)) {
-                recordsToSubmit.add(event);
+            if (!event.getChildUuids().isEmpty() && 
!isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
+                // If framework generated the event, add it to the 
'recordsToSubmit' Set.
+                if (!processorGenerated.contains(event)) {
+                    recordsToSubmit.add(event);
+                }
 
+                // Register the FORK event for each child and each parent.
                 for (final String childUuid : event.getChildUuids()) {
                     addEventType(eventTypesPerFlowFileId, childUuid, 
event.getEventType());
                 }
@@ -536,13 +540,12 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
                 continue;
             }
-            if (isSpuriousRouteEvent(event, checkpoint.records)) {
-                continue;
-            }
 
             // Check if the event indicates that the FlowFile was routed to 
the same
             // connection from which it was pulled (and only this connection). 
If so, discard the event.
-            isSpuriousRouteEvent(event, checkpoint.records);
+            if (isSpuriousRouteEvent(event, checkpoint.records)) {
+                continue;
+            }
 
             recordsToSubmit.add(event);
             addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), 
event.getEventType());
@@ -596,9 +599,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 boolean creationEventRegistered = false;
                 if (registeredTypes != null) {
                     if (registeredTypes.contains(ProvenanceEventType.CREATE)
-                            || 
registeredTypes.contains(ProvenanceEventType.FORK)
-                            || 
registeredTypes.contains(ProvenanceEventType.JOIN)
-                            || 
registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
+                        || registeredTypes.contains(ProvenanceEventType.FORK)
+                        || registeredTypes.contains(ProvenanceEventType.JOIN)
+                        || 
registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
                         creationEventRegistered = true;
                     }
                 }
@@ -687,11 +690,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             builder.setCurrentContentClaim(null, null, null, null, 0L);
         } else {
             builder.setCurrentContentClaim(
-                    originalClaim.getContainer(), originalClaim.getSection(), 
originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), 
repoRecord.getOriginal().getSize()
-            );
+                originalClaim.getContainer(), originalClaim.getSection(), 
originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), 
repoRecord.getOriginal().getSize()
+                );
             builder.setPreviousContentClaim(
-                    originalClaim.getContainer(), originalClaim.getSection(), 
originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), 
repoRecord.getOriginal().getSize()
-            );
+                originalClaim.getContainer(), originalClaim.getSection(), 
originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), 
repoRecord.getOriginal().getSize()
+                );
         }
     }
 
@@ -727,7 +730,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     private StandardProvenanceEventRecord enrich(
-            final ProvenanceEventRecord rawEvent, final Map<String, 
FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, 
StandardRepositoryRecord> records, final boolean updateAttributes) {
+        final ProvenanceEventRecord rawEvent, final Map<String, 
FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, 
StandardRepositoryRecord> records, final boolean updateAttributes) {
         final StandardProvenanceEventRecord.Builder recordBuilder = new 
StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
         final FlowFileRecord eventFlowFile = 
flowFileRecordMap.get(rawEvent.getFlowFileUuid());
         if (eventFlowFile != null) {
@@ -996,7 +999,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         final StringBuilder sb = new StringBuilder(512);
         if (!LOG.isDebugEnabled() && (largestTransferSetSize > 
VERBOSE_LOG_THRESHOLD
-                || numModified > VERBOSE_LOG_THRESHOLD || numCreated > 
VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
+            || numModified > VERBOSE_LOG_THRESHOLD || numCreated > 
VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
             if (numCreated > 0) {
                 sb.append("created ").append(numCreated).append(" FlowFiles, 
");
             }
@@ -1244,8 +1247,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
 
         final FlowFileRecord fFile = new 
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
-                .addAttributes(attrs)
-                .build();
+            .addAttributes(attrs)
+            .build();
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
         record.setWorking(fFile, attrs);
         records.put(fFile, record);
@@ -1607,7 +1610,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
 
         final StandardProvenanceReporter expiredReporter = new 
StandardProvenanceReporter(connectable.getIdentifier(),
-                processorType, context.getProvenanceRepository(), this);
+            processorType, context.getProvenanceRepository(), this);
 
         final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
         for (final FlowFileRecord flowFile : flowFiles) {
@@ -1734,9 +1737,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
 
         try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset());
-                final InputStream limitedIn = new LimitedInputStream(rawIn, 
source.getSize());
-                final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
-                final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+            final InputStream limitedIn = new LimitedInputStream(rawIn, 
source.getSize());
+            final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+            final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
 
             // We want to differentiate between IOExceptions thrown by the 
repository and IOExceptions thrown from
             // Processor code. As a result, as have the 
FlowFileAccessInputStream that catches IOException from the repository
@@ -1807,7 +1810,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         try {
             try (final OutputStream rawOut = contentRepo.write(newClaim);
-                    final OutputStream out = new BufferedOutputStream(rawOut)) 
{
+                final OutputStream out = new BufferedOutputStream(rawOut)) {
 
                 if (header != null && header.length > 0) {
                     out.write(header);
@@ -1914,7 +1917,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 ensureNotAppending(newClaim);
 
                 try (final OutputStream disableOnClose = new 
DisableOnCloseOutputStream(currentWriteClaimStream);
-                        final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnClose, writtenHolder)) {
+                    final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnClose, writtenHolder)) {
 
                     recursionSet.add(source);
 
@@ -1937,7 +1940,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
                 ensureNotAppending(newClaim);
                 try (final OutputStream stream = 
context.getContentRepository().write(newClaim);
-                        final OutputStream countingOut = new 
ByteCountingOutputStream(stream, writtenHolder)) {
+                    final OutputStream countingOut = new 
ByteCountingOutputStream(stream, writtenHolder)) {
                     recursionSet.add(source);
 
                     writeRecursionLevel++;
@@ -2173,11 +2176,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 ensureNotAppending(newClaim);
 
                 try (final InputStream rawIn = getInputStream(source, 
currClaim, record.getCurrentClaimOffset());
-                        final InputStream limitedIn = new 
LimitedInputStream(rawIn, source.getSize());
-                        final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
-                        final InputStream countingIn = new 
ByteCountingInputStream(disableOnCloseIn, bytesRead);
-                        final OutputStream disableOnCloseOut = new 
DisableOnCloseOutputStream(currentWriteClaimStream);
-                        final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
+                    final InputStream limitedIn = new 
LimitedInputStream(rawIn, source.getSize());
+                    final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+                    final InputStream countingIn = new 
ByteCountingInputStream(disableOnCloseIn, bytesRead);
+                    final OutputStream disableOnCloseOut = new 
DisableOnCloseOutputStream(currentWriteClaimStream);
+                    final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
 
                     recursionSet.add(source);
 
@@ -2216,10 +2219,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 ensureNotAppending(newClaim);
 
                 try (final InputStream is = getInputStream(source, currClaim, 
record.getCurrentClaimOffset());
-                        final InputStream limitedIn = new 
LimitedInputStream(is, source.getSize());
-                        final InputStream countingIn = new 
ByteCountingInputStream(limitedIn, bytesRead);
-                        final OutputStream os = 
context.getContentRepository().write(newClaim);
-                        final OutputStream countingOut = new 
ByteCountingOutputStream(os, writtenHolder)) {
+                    final InputStream limitedIn = new LimitedInputStream(is, 
source.getSize());
+                    final InputStream countingIn = new 
ByteCountingInputStream(limitedIn, bytesRead);
+                    final OutputStream os = 
context.getContentRepository().write(newClaim);
+                    final OutputStream countingOut = new 
ByteCountingOutputStream(os, writtenHolder)) {
 
                     recursionSet.add(source);
 
@@ -2332,9 +2335,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         removeTemporaryClaim(record);
 
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
-                
.contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
-                .addAttribute(CoreAttributes.FILENAME.key(), 
source.toFile().getName())
-                .build();
+            
.contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
+            .addAttribute(CoreAttributes.FILENAME.key(), 
source.toFile().getName())
+            .build();
         record.setWorking(newFile, CoreAttributes.FILENAME.key(), 
source.toFile().getName());
         if (!keepSourceFile) {
             deleteOnCommit.add(source);
@@ -2501,8 +2504,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             final String key = entry.getKey();
             final String value = entry.getValue();
             if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
-                    || CoreAttributes.DISCARD_REASON.key().equals(key)
-                    || CoreAttributes.UUID.key().equals(key)) {
+                || CoreAttributes.DISCARD_REASON.key().equals(key)
+                || CoreAttributes.UUID.key().equals(key)) {
                 continue;
             }
             newAttributes.put(key, value);
@@ -2547,10 +2550,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         newAttributes.put(CoreAttributes.UUID.key(), 
UUID.randomUUID().toString());
 
         final FlowFileRecord fFile = new 
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
-                .addAttributes(newAttributes)
-                .lineageIdentifiers(lineageIdentifiers)
-                .lineageStartDate(lineageStartDate)
-                .build();
+            .addAttributes(newAttributes)
+            .lineageIdentifiers(lineageIdentifiers)
+            .lineageStartDate(lineageStartDate)
+            .build();
 
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
         record.setWorking(fFile, newAttributes);
@@ -2584,18 +2587,18 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final Map<String, String> firstMap = 
flowFileList.iterator().next().getAttributes();
 
         outer:
-        for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
-            final String key = mapEntry.getKey();
-            final String value = mapEntry.getValue();
-            for (final FlowFile flowFile : flowFileList) {
-                final Map<String, String> currMap = flowFile.getAttributes();
-                final String curVal = currMap.get(key);
-                if (curVal == null || !curVal.equals(value)) {
-                    continue outer;
+            for (final Map.Entry<String, String> mapEntry : 
firstMap.entrySet()) {
+                final String key = mapEntry.getKey();
+                final String value = mapEntry.getValue();
+                for (final FlowFile flowFile : flowFileList) {
+                    final Map<String, String> currMap = 
flowFile.getAttributes();
+                    final String curVal = currMap.get(key);
+                    if (curVal == null || !curVal.equals(value)) {
+                        continue outer;
+                    }
                 }
+                result.put(key, value);
             }
-            result.put(key, value);
-        }
 
         return result;
     }

Reply via email to