NIFI-841 correct patch paths and fixed formatting issues

Signed-off-by: Mark Payne <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c3731703
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c3731703
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c3731703

Branch: refs/heads/master
Commit: c3731703f397574399e14a7b1589bda9e9177fd9
Parents: 35439db
Author: joewitt <[email protected]>
Authored: Mon Aug 17 00:05:03 2015 -0400
Committer: Mark Payne <[email protected]>
Committed: Tue Aug 18 20:06:42 2015 -0400

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 508 ++++++++++---------
 1 file changed, 260 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c3731703/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index fe7f125..01fc3c8 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -81,7 +81,9 @@ import org.slf4j.LoggerFactory;
 
 /**
  * <p>
- * Provides a ProcessSession that ensures all accesses, changes and transfers 
occur in an atomic manner for all FlowFiles including their contents and 
attributes</p>
+ * Provides a ProcessSession that ensures all accesses, changes and transfers
+ * occur in an atomic manner for all FlowFiles including their contents and
+ * attributes</p>
  * <p>
  * NOT THREAD SAFE</p>
  * <p/>
@@ -93,7 +95,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     // determines how many things must be transferred, removed, modified in 
order to avoid logging the FlowFile ID's on commit/rollback
     public static final int VERBOSE_LOG_THRESHOLD = 10;
     private static final long MAX_APPENDABLE_CLAIM_SIZE = 
DataUnit.parseDataSize(
-        NiFiProperties.getInstance().getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
+            NiFiProperties.getInstance().getMaxAppendableClaimSize(), 
DataUnit.B).longValue();
     private static final int MAX_FLOWFILES_PER_CLAIM = 
NiFiProperties.getInstance().getMaxFlowFilesPerClaim();
 
     public static final String DEFAULT_FLOWFILE_PATH = "./";
@@ -154,32 +156,32 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         String description = connectable.toString();
         switch (connectable.getConnectableType()) {
-        case PROCESSOR:
-            final ProcessorNode procNode = (ProcessorNode) connectable;
-            componentType = procNode.getProcessor().getClass().getSimpleName();
-            description = procNode.getProcessor().toString();
-            break;
-        case INPUT_PORT:
-            componentType = "Input Port";
-            break;
-        case OUTPUT_PORT:
-            componentType = "Output Port";
-            break;
-        case REMOTE_INPUT_PORT:
-            componentType = "Remote Input Port";
-            break;
-        case REMOTE_OUTPUT_PORT:
-            componentType = "Remote Output Port";
-            break;
-        case FUNNEL:
-            componentType = "Funnel";
-            break;
-        default:
-            throw new AssertionError("Connectable type is " + 
connectable.getConnectableType());
+            case PROCESSOR:
+                final ProcessorNode procNode = (ProcessorNode) connectable;
+                componentType = 
procNode.getProcessor().getClass().getSimpleName();
+                description = procNode.getProcessor().toString();
+                break;
+            case INPUT_PORT:
+                componentType = "Input Port";
+                break;
+            case OUTPUT_PORT:
+                componentType = "Output Port";
+                break;
+            case REMOTE_INPUT_PORT:
+                componentType = "Remote Input Port";
+                break;
+            case REMOTE_OUTPUT_PORT:
+                componentType = "Remote Output Port";
+                break;
+            case FUNNEL:
+                componentType = "Funnel";
+                break;
+            default:
+                throw new AssertionError("Connectable type is " + 
connectable.getConnectableType());
         }
 
         this.provenanceReporter = new StandardProvenanceReporter(this, 
connectable.getIdentifier(), componentType,
-            context.getProvenanceRepository(), this);
+                context.getProvenanceRepository(), this);
         this.sessionId = idGenerator.getAndIncrement();
         this.connectableDescription = description;
 
@@ -292,148 +294,147 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
     @SuppressWarnings({"unchecked", "rawtypes"})
     private void commit(final Checkpoint checkpoint) {
-       try {
-               final long commitStartNanos = System.nanoTime();
-       
-               resetWriteClaims(false);
-               resetReadClaim();
-       
-               final long updateProvenanceStart = System.nanoTime();
-               updateProvenanceRepo(checkpoint);
-       
-               final long claimRemovalStart = System.nanoTime();
-               final long updateProvenanceNanos = claimRemovalStart - 
updateProvenanceStart;
-       
+        try {
+            final long commitStartNanos = System.nanoTime();
+
+            resetWriteClaims(false);
+            resetReadClaim();
+
+            final long updateProvenanceStart = System.nanoTime();
+            updateProvenanceRepo(checkpoint);
+
+            final long claimRemovalStart = System.nanoTime();
+            final long updateProvenanceNanos = claimRemovalStart - 
updateProvenanceStart;
+
                // Figure out which content claims can be released.
-               // At this point, we will decrement the Claimant Count for the 
claims via the Content Repository.
-               // We do not actually destroy the content because otherwise, we 
could remove the
-               // Original Claim and crash/restart before the 
FlowFileRepository is updated. This will result in the FlowFile being restored 
such that
-               // the content claim points to the Original Claim -- which has 
already been removed!
-               for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> 
entry : checkpoint.records.entrySet()) {
-                   final FlowFile flowFile = entry.getKey();
-                   final StandardRepositoryRecord record = entry.getValue();
-       
-                   if (record.isMarkedForDelete()) {
+            // At this point, we will decrement the Claimant Count for the 
claims via the Content Repository.
+            // We do not actually destroy the content because otherwise, we 
could remove the
+            // Original Claim and crash/restart before the FlowFileRepository 
is updated. This will result in the FlowFile being restored such that
+            // the content claim points to the Original Claim -- which has 
already been removed!
+            for (final Map.Entry<FlowFileRecord, StandardRepositoryRecord> 
entry : checkpoint.records.entrySet()) {
+                final FlowFile flowFile = entry.getKey();
+                final StandardRepositoryRecord record = entry.getValue();
+
+                if (record.isMarkedForDelete()) {
                        // if the working claim is not the same as the original 
claim, we can immediately destroy the working claim
-                       // because it was created in this session and is to be 
deleted. We don't need to wait for the FlowFile Repo to sync.
-                       removeContent(record.getWorkingClaim());
-       
-                       if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getWorkingClaim())) {
+                    // because it was created in this session and is to be 
deleted. We don't need to wait for the FlowFile Repo to sync.
+                    removeContent(record.getWorkingClaim());
+
+                    if (record.getOriginalClaim() != null && 
!record.getOriginalClaim().equals(record.getWorkingClaim())) {
                            // if working & original claim are same, don't 
remove twice; we only want to remove the original
-                           // if it's different from the working. Otherwise, 
we remove two claimant counts. This causes
-                           // an issue if we only updated the FlowFile 
attributes.
-                           removeContent(record.getOriginalClaim());
-                       }
-                       final long flowFileLife = System.currentTimeMillis() - 
flowFile.getEntryDate();
-                       final Connectable connectable = 
context.getConnectable();
-                       final Object terminator = connectable instanceof 
ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
-                       LOG.info("{} terminated by {}; life of FlowFile = {} 
ms", new Object[]{flowFile, terminator, flowFileLife});
-                   } else if (record.isWorking() && record.getWorkingClaim() 
!= record.getOriginalClaim()) {
-                       //records which have been updated - remove original if 
exists
-                       removeContent(record.getOriginalClaim());
-                   }
-               }
-       
-               final long claimRemovalFinishNanos = System.nanoTime();
-               final long claimRemovalNanos = claimRemovalFinishNanos - 
claimRemovalStart;
-       
-               // Update the FlowFile Repository
-               try {
-                   final Collection<StandardRepositoryRecord> repoRecords = 
checkpoint.records.values();
-                   
context.getFlowFileRepository().updateRepository((Collection) repoRecords);
-               } catch (final IOException ioe) {
-                   rollback();
-                   throw new ProcessException("FlowFile Repository failed to 
update", ioe);
-               }
-               final long flowFileRepoUpdateFinishNanos = System.nanoTime();
-               final long flowFileRepoUpdateNanos = 
flowFileRepoUpdateFinishNanos - claimRemovalFinishNanos;
-       
-               updateEventRepository(checkpoint);
-       
-               final long updateEventRepositoryFinishNanos = System.nanoTime();
-               final long updateEventRepositoryNanos = 
updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
-       
-               // transfer the flowfiles to the connections' queues.
-               final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap 
= new HashMap<>();
-               for (final StandardRepositoryRecord record : 
checkpoint.records.values()) {
-                   if (record.isMarkedForAbort() || 
record.isMarkedForDelete()) {
-                       continue; //these don't need to be transferred
-                   }
+                        // if it's different from the working. Otherwise, we 
remove two claimant counts. This causes
+                        // an issue if we only updated the FlowFile attributes.
+                        removeContent(record.getOriginalClaim());
+                    }
+                    final long flowFileLife = System.currentTimeMillis() - 
flowFile.getEntryDate();
+                    final Connectable connectable = context.getConnectable();
+                    final Object terminator = connectable instanceof 
ProcessorNode ? ((ProcessorNode) connectable).getProcessor() : connectable;
+                    LOG.info("{} terminated by {}; life of FlowFile = {} ms", 
new Object[]{flowFile, terminator, flowFileLife});
+                } else if (record.isWorking() && record.getWorkingClaim() != 
record.getOriginalClaim()) {
+                    //records which have been updated - remove original if 
exists
+                    removeContent(record.getOriginalClaim());
+                }
+            }
+
+            final long claimRemovalFinishNanos = System.nanoTime();
+            final long claimRemovalNanos = claimRemovalFinishNanos - 
claimRemovalStart;
+
+            // Update the FlowFile Repository
+            try {
+                final Collection<StandardRepositoryRecord> repoRecords = 
checkpoint.records.values();
+                context.getFlowFileRepository().updateRepository((Collection) 
repoRecords);
+            } catch (final IOException ioe) {
+                rollback();
+                throw new ProcessException("FlowFile Repository failed to 
update", ioe);
+            }
+            final long flowFileRepoUpdateFinishNanos = System.nanoTime();
+            final long flowFileRepoUpdateNanos = flowFileRepoUpdateFinishNanos 
- claimRemovalFinishNanos;
+
+            updateEventRepository(checkpoint);
+
+            final long updateEventRepositoryFinishNanos = System.nanoTime();
+            final long updateEventRepositoryNanos = 
updateEventRepositoryFinishNanos - claimRemovalFinishNanos;
+
+            // transfer the flowfiles to the connections' queues.
+            final Map<FlowFileQueue, Collection<FlowFileRecord>> recordMap = 
new HashMap<>();
+            for (final StandardRepositoryRecord record : 
checkpoint.records.values()) {
+                if (record.isMarkedForAbort() || record.isMarkedForDelete()) {
+                    continue; //these don't need to be transferred
+                }
                    // record.getCurrent() will return null if this record was 
created in this session --
-                   // in this case, we just ignore it, and it will be cleaned 
up by clearing the records map.
-                   if (record.getCurrent() != null) {
-                       Collection<FlowFileRecord> collection = 
recordMap.get(record.getDestination());
-                       if (collection == null) {
-                           collection = new ArrayList<>();
-                           recordMap.put(record.getDestination(), collection);
-                       }
-                       collection.add(record.getCurrent());
-                   }
-               }
-       
-               for (final Map.Entry<FlowFileQueue, Collection<FlowFileRecord>> 
entry : recordMap.entrySet()) {
-                   entry.getKey().putAll(entry.getValue());
-               }
-       
-               final long enqueueFlowFileFinishNanos = System.nanoTime();
-               final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - 
updateEventRepositoryFinishNanos;
-       
-               // Delete any files from disk that need to be removed.
-               for (final Path path : checkpoint.deleteOnCommit) {
-                   try {
-                       Files.deleteIfExists(path);
-                   } catch (final IOException e) {
-                       throw new FlowFileAccessException("Unable to delete " + 
path.toFile().getAbsolutePath(), e);
-                   }
-               }
-               checkpoint.deleteOnCommit.clear();
-       
-               if (LOG.isInfoEnabled()) {
-                   final String sessionSummary = summarizeEvents(checkpoint);
-                   if (!sessionSummary.isEmpty()) {
-                       LOG.info("{} for {}, committed the following events: 
{}", new Object[]{this, connectableDescription, sessionSummary});
-                   }
-               }
-       
-               for (final Map.Entry<String, Long> entry : 
checkpoint.localCounters.entrySet()) {
-                   adjustCounter(entry.getKey(), entry.getValue(), true);
-               }
-       
-               for (final Map.Entry<String, Long> entry : 
checkpoint.globalCounters.entrySet()) {
-                   adjustCounter(entry.getKey(), entry.getValue(), true);
-               }
-       
-               acknowledgeRecords();
-               resetState();
-       
-               if (LOG.isDebugEnabled()) {
-                   final StringBuilder timingInfo = new StringBuilder();
-                   timingInfo.append("Session commit for 
").append(this).append(" 
[").append(connectableDescription).append("]").append(" took ");
-       
-                   final long commitNanos = System.nanoTime() - 
commitStartNanos;
-                   formatNanos(commitNanos, timingInfo);
-                   timingInfo.append("; FlowFile Repository Update took ");
-                   formatNanos(flowFileRepoUpdateNanos, timingInfo);
-                   timingInfo.append("; Claim Removal took ");
-                   formatNanos(claimRemovalNanos, timingInfo);
-                   timingInfo.append("; FlowFile Event Update took ");
-                   formatNanos(updateEventRepositoryNanos, timingInfo);
-                   timingInfo.append("; Enqueuing FlowFiles took ");
-                   formatNanos(enqueueFlowFileNanos, timingInfo);
-                   timingInfo.append("; Updating Provenance Event Repository 
took ");
-                   formatNanos(updateProvenanceNanos, timingInfo);
-       
-                   LOG.debug(timingInfo.toString());
-               }
-       } catch (final Exception e) {
-               try {
-                       rollback();
-               } catch (final Exception e1) {
-                       e.addSuppressed(e1);
-               }
-               
-               throw e;
-       }
+                // in this case, we just ignore it, and it will be cleaned up 
by clearing the records map.
+                if (record.getCurrent() != null) {
+                    Collection<FlowFileRecord> collection = 
recordMap.get(record.getDestination());
+                    if (collection == null) {
+                        collection = new ArrayList<>();
+                        recordMap.put(record.getDestination(), collection);
+                    }
+                    collection.add(record.getCurrent());
+                }
+            }
+
+            for (final Map.Entry<FlowFileQueue, Collection<FlowFileRecord>> 
entry : recordMap.entrySet()) {
+                entry.getKey().putAll(entry.getValue());
+            }
+
+            final long enqueueFlowFileFinishNanos = System.nanoTime();
+            final long enqueueFlowFileNanos = enqueueFlowFileFinishNanos - 
updateEventRepositoryFinishNanos;
+
+            // Delete any files from disk that need to be removed.
+            for (final Path path : checkpoint.deleteOnCommit) {
+                try {
+                    Files.deleteIfExists(path);
+                } catch (final IOException e) {
+                    throw new FlowFileAccessException("Unable to delete " + 
path.toFile().getAbsolutePath(), e);
+                }
+            }
+            checkpoint.deleteOnCommit.clear();
+
+            if (LOG.isInfoEnabled()) {
+                final String sessionSummary = summarizeEvents(checkpoint);
+                if (!sessionSummary.isEmpty()) {
+                    LOG.info("{} for {}, committed the following events: {}", 
new Object[]{this, connectableDescription, sessionSummary});
+                }
+            }
+
+            for (final Map.Entry<String, Long> entry : 
checkpoint.localCounters.entrySet()) {
+                adjustCounter(entry.getKey(), entry.getValue(), true);
+            }
+
+            for (final Map.Entry<String, Long> entry : 
checkpoint.globalCounters.entrySet()) {
+                adjustCounter(entry.getKey(), entry.getValue(), true);
+            }
+
+            acknowledgeRecords();
+            resetState();
+
+            if (LOG.isDebugEnabled()) {
+                final StringBuilder timingInfo = new StringBuilder();
+                timingInfo.append("Session commit for ").append(this).append(" 
[").append(connectableDescription).append("]").append(" took ");
+
+                final long commitNanos = System.nanoTime() - commitStartNanos;
+                formatNanos(commitNanos, timingInfo);
+                timingInfo.append("; FlowFile Repository Update took ");
+                formatNanos(flowFileRepoUpdateNanos, timingInfo);
+                timingInfo.append("; Claim Removal took ");
+                formatNanos(claimRemovalNanos, timingInfo);
+                timingInfo.append("; FlowFile Event Update took ");
+                formatNanos(updateEventRepositoryNanos, timingInfo);
+                timingInfo.append("; Enqueuing FlowFiles took ");
+                formatNanos(enqueueFlowFileNanos, timingInfo);
+                timingInfo.append("; Updating Provenance Event Repository took 
");
+                formatNanos(updateProvenanceNanos, timingInfo);
+
+                LOG.debug(timingInfo.toString());
+            }
+        } catch (final Exception e) {
+            try {
+                rollback();
+            } catch (final Exception e1) {
+                e.addSuppressed(e1);
+            }
+            throw e;
+        }
     }
 
     private void updateEventRepository(final Checkpoint checkpoint) {
@@ -448,16 +449,16 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             }
 
             switch (event.getEventType()) {
-            case SEND:
-                flowFilesSent++;
-                bytesSent += event.getFileSize();
-                break;
-            case RECEIVE:
-                flowFilesReceived++;
-                bytesReceived += event.getFileSize();
-                break;
-            default:
-                break;
+                case SEND:
+                    flowFilesSent++;
+                    bytesSent += event.getFileSize();
+                    break;
+                case RECEIVE:
+                    flowFilesReceived++;
+                    bytesReceived += event.getFileSize();
+                    break;
+                default:
+                    break;
             }
         }
 
@@ -610,9 +611,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 boolean creationEventRegistered = false;
                 if (registeredTypes != null) {
                     if (registeredTypes.contains(ProvenanceEventType.CREATE)
-                        || registeredTypes.contains(ProvenanceEventType.FORK)
-                        || registeredTypes.contains(ProvenanceEventType.JOIN)
-                        || 
registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
+                            || 
registeredTypes.contains(ProvenanceEventType.FORK)
+                            || 
registeredTypes.contains(ProvenanceEventType.JOIN)
+                            || 
registeredTypes.contains(ProvenanceEventType.RECEIVE)) {
                         creationEventRegistered = true;
                     }
                 }
@@ -701,11 +702,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             builder.setCurrentContentClaim(null, null, null, null, 0L);
         } else {
             builder.setCurrentContentClaim(
-                originalClaim.getContainer(), originalClaim.getSection(), 
originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), 
repoRecord.getOriginal().getSize()
-                );
+                    originalClaim.getContainer(), originalClaim.getSection(), 
originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), 
repoRecord.getOriginal().getSize()
+            );
             builder.setPreviousContentClaim(
-                originalClaim.getContainer(), originalClaim.getSection(), 
originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), 
repoRecord.getOriginal().getSize()
-                );
+                    originalClaim.getContainer(), originalClaim.getSection(), 
originalClaim.getId(), repoRecord.getOriginal().getContentClaimOffset(), 
repoRecord.getOriginal().getSize()
+            );
         }
     }
 
@@ -741,7 +742,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     private StandardProvenanceEventRecord enrich(
-        final ProvenanceEventRecord rawEvent, final Map<String, 
FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, 
StandardRepositoryRecord> records, final boolean updateAttributes) {
+            final ProvenanceEventRecord rawEvent, final Map<String, 
FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, 
StandardRepositoryRecord> records, final boolean updateAttributes) {
         final StandardProvenanceEventRecord.Builder recordBuilder = new 
StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
         final FlowFileRecord eventFlowFile = 
flowFileRecordMap.get(rawEvent.getFlowFileUuid());
         if (eventFlowFile != null) {
@@ -781,8 +782,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     /**
-     * Checks if the given event is a spurious FORK, meaning that the FORK has 
a single child and that child was removed in this session. This happens when a 
Processor calls #create(FlowFile) and then
-     * removes the created FlowFile.
+     * Checks if the given event is a spurious FORK, meaning that the FORK has 
a
+     * single child and that child was removed in this session. This happens
+     * when a Processor calls #create(FlowFile) and then removes the created
+     * FlowFile.
      *
      * @param event event
      * @return true if spurious fork
@@ -799,8 +802,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     /**
-     * Checks if the given event is a spurious ROUTE, meaning that the ROUTE 
indicates that a FlowFile was routed to a relationship with only 1 connection 
and that Connection is the Connection from
-     * which the FlowFile was pulled. I.e., the FlowFile was really routed 
nowhere.
+     * Checks if the given event is a spurious ROUTE, meaning that the ROUTE
+     * indicates that a FlowFile was routed to a relationship with only 1
+     * connection and that Connection is the Connection from which the FlowFile
+     * was pulled. I.e., the FlowFile was really routed nowhere.
      *
      * @param event event
      * @param records records
@@ -1010,7 +1015,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         final StringBuilder sb = new StringBuilder(512);
         if (!LOG.isDebugEnabled() && (largestTransferSetSize > 
VERBOSE_LOG_THRESHOLD
-            || numModified > VERBOSE_LOG_THRESHOLD || numCreated > 
VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
+                || numModified > VERBOSE_LOG_THRESHOLD || numCreated > 
VERBOSE_LOG_THRESHOLD || numRemoved > VERBOSE_LOG_THRESHOLD)) {
             if (numCreated > 0) {
                 sb.append("created ").append(numCreated).append(" FlowFiles, 
");
             }
@@ -1258,8 +1263,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         attrs.put(CoreAttributes.UUID.key(), UUID.randomUUID().toString());
 
         final FlowFileRecord fFile = new 
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
-            .addAttributes(attrs)
-            .build();
+                .addAttributes(attrs)
+                .build();
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
         record.setWorking(fFile, attrs);
         records.put(fFile, record);
@@ -1621,7 +1626,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
 
         final StandardProvenanceReporter expiredReporter = new 
StandardProvenanceReporter(this, connectable.getIdentifier(),
-            processorType, context.getProvenanceRepository(), this);
+                processorType, context.getProvenanceRepository(), this);
 
         final Map<String, FlowFileRecord> recordIdMap = new HashMap<>();
         for (final FlowFileRecord flowFile : flowFiles) {
@@ -1748,9 +1753,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         }
 
         try (final InputStream rawIn = getInputStream(source, 
record.getCurrentClaim(), record.getCurrentClaimOffset());
-            final InputStream limitedIn = new LimitedInputStream(rawIn, 
source.getSize());
-            final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
-            final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
+                final InputStream limitedIn = new LimitedInputStream(rawIn, 
source.getSize());
+                final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+                final ByteCountingInputStream countingStream = new 
ByteCountingInputStream(disableOnCloseIn, this.bytesRead)) {
 
             // We want to differentiate between IOExceptions thrown by the 
repository and IOExceptions thrown from
             // Processor code. As a result, as have the 
FlowFileAccessInputStream that catches IOException from the repository
@@ -1821,7 +1826,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         try {
             try (final OutputStream rawOut = contentRepo.write(newClaim);
-                final OutputStream out = new BufferedOutputStream(rawOut)) {
+                    final OutputStream out = new BufferedOutputStream(rawOut)) 
{
 
                 if (header != null && header.length > 0) {
                     out.write(header);
@@ -1928,7 +1933,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 ensureNotAppending(newClaim);
 
                 try (final OutputStream disableOnClose = new 
DisableOnCloseOutputStream(currentWriteClaimStream);
-                    final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnClose, writtenHolder)) {
+                        final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnClose, writtenHolder)) {
 
                     recursionSet.add(source);
 
@@ -1951,7 +1956,7 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
                 ensureNotAppending(newClaim);
                 try (final OutputStream stream = 
context.getContentRepository().write(newClaim);
-                    final OutputStream countingOut = new 
ByteCountingOutputStream(stream, writtenHolder)) {
+                        final OutputStream countingOut = new 
ByteCountingOutputStream(stream, writtenHolder)) {
                     recursionSet.add(source);
 
                     writeRecursionLevel++;
@@ -2092,10 +2097,13 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     /**
-     * Checks if the ContentClaim associated with this record should be 
removed, since the record is about to be updated to point to a new content 
claim. If so, removes the working claim.
+     * Checks if the ContentClaim associated with this record should be 
removed,
+     * since the record is about to be updated to point to a new content claim.
+     * If so, removes the working claim.
      *
-     * This happens if & only if the content of this FlowFile has been 
modified since it was last committed to the FlowFile repository, because this 
indicates that the content is no longer needed and
-     * should be cleaned up.
+     * This happens if & only if the content of this FlowFile has been modified
+     * since it was last committed to the FlowFile repository, because this
+     * indicates that the content is no longer needed and should be cleaned up.
      *
      * @param record record
      */
@@ -2125,22 +2133,22 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     private void resetWriteClaims() {
-       resetWriteClaims(true);
+        resetWriteClaims(true);
     }
-    
+
     private void resetWriteClaims(final boolean suppressExceptions) {
         try {
             if (currentWriteClaimStream != null) {
-               try {
-                       currentWriteClaimStream.flush();
-               } finally {
-                       currentWriteClaimStream.close();
-               }
+                try {
+                    currentWriteClaimStream.flush();
+                } finally {
+                    currentWriteClaimStream.close();
+                }
             }
         } catch (final IOException e) {
-               if (!suppressExceptions) {
-                       throw new FlowFileAccessException("Unable to flush the 
output of FlowFile to the Content Repository");
-               }
+            if (!suppressExceptions) {
+                throw new FlowFileAccessException("Unable to flush the output 
of FlowFile to the Content Repository");
+            }
         }
 
         currentWriteClaimStream = null;
@@ -2150,15 +2158,15 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
 
         for (final ByteCountingOutputStream out : appendableStreams.values()) {
             try {
-               try {
-                       out.flush();
-               } finally {
-                       out.close();
-               }
+                try {
+                    out.flush();
+                } finally {
+                    out.close();
+                }
             } catch (final IOException e) {
-               if (!suppressExceptions) {
-                       throw new FlowFileAccessException("Unable to flush the 
output of FlowFile to the Content Repository");
-               }
+                if (!suppressExceptions) {
+                    throw new FlowFileAccessException("Unable to flush the 
output of FlowFile to the Content Repository");
+                }
             }
         }
         appendableStreams.clear();
@@ -2176,7 +2184,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     /**
-     * @return Indicates whether or not multiple FlowFiles should be merged 
into a single ContentClaim
+     * @return Indicates whether or not multiple FlowFiles should be merged 
into
+     * a single ContentClaim
      */
     private boolean isMergeContent() {
         if (writeRecursionLevel > 0) {
@@ -2204,11 +2213,11 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 ensureNotAppending(newClaim);
 
                 try (final InputStream rawIn = getInputStream(source, 
currClaim, record.getCurrentClaimOffset());
-                    final InputStream limitedIn = new 
LimitedInputStream(rawIn, source.getSize());
-                    final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
-                    final InputStream countingIn = new 
ByteCountingInputStream(disableOnCloseIn, bytesRead);
-                    final OutputStream disableOnCloseOut = new 
DisableOnCloseOutputStream(currentWriteClaimStream);
-                    final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
+                        final InputStream limitedIn = new 
LimitedInputStream(rawIn, source.getSize());
+                        final InputStream disableOnCloseIn = new 
DisableOnCloseInputStream(limitedIn);
+                        final InputStream countingIn = new 
ByteCountingInputStream(disableOnCloseIn, bytesRead);
+                        final OutputStream disableOnCloseOut = new 
DisableOnCloseOutputStream(currentWriteClaimStream);
+                        final OutputStream countingOut = new 
ByteCountingOutputStream(disableOnCloseOut, writtenHolder)) {
 
                     recursionSet.add(source);
 
@@ -2247,10 +2256,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
                 ensureNotAppending(newClaim);
 
                 try (final InputStream is = getInputStream(source, currClaim, 
record.getCurrentClaimOffset());
-                    final InputStream limitedIn = new LimitedInputStream(is, 
source.getSize());
-                    final InputStream countingIn = new 
ByteCountingInputStream(limitedIn, bytesRead);
-                    final OutputStream os = 
context.getContentRepository().write(newClaim);
-                    final OutputStream countingOut = new 
ByteCountingOutputStream(os, writtenHolder)) {
+                        final InputStream limitedIn = new 
LimitedInputStream(is, source.getSize());
+                        final InputStream countingIn = new 
ByteCountingInputStream(limitedIn, bytesRead);
+                        final OutputStream os = 
context.getContentRepository().write(newClaim);
+                        final OutputStream countingOut = new 
ByteCountingOutputStream(os, writtenHolder)) {
 
                     recursionSet.add(source);
 
@@ -2363,9 +2372,9 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         removeTemporaryClaim(record);
 
         final FlowFileRecord newFile = new 
StandardFlowFileRecord.Builder().fromFlowFile(record.getCurrent())
-            
.contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
-            .addAttribute(CoreAttributes.FILENAME.key(), 
source.toFile().getName())
-            .build();
+                
.contentClaim(newClaim).contentClaimOffset(claimOffset).size(newSize)
+                .addAttribute(CoreAttributes.FILENAME.key(), 
source.toFile().getName())
+                .build();
         record.setWorking(newFile, CoreAttributes.FILENAME.key(), 
source.toFile().getName());
         if (!keepSourceFile) {
             deleteOnCommit.add(source);
@@ -2520,7 +2529,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
      * Checks if a FlowFile is known in this session.
      *
      * @param flowFile the FlowFile to check
-     * @return <code>true</code> if the FlowFile is known in this session, 
<code>false</code> otherwise.
+     * @return <code>true</code> if the FlowFile is known in this session,
+     * <code>false</code> otherwise.
      */
     boolean isFlowFileKnown(final FlowFile flowFile) {
         return records.containsKey(flowFile);
@@ -2542,8 +2552,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
             final String key = entry.getKey();
             final String value = entry.getValue();
             if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
-                || CoreAttributes.DISCARD_REASON.key().equals(key)
-                || CoreAttributes.UUID.key().equals(key)) {
+                    || CoreAttributes.DISCARD_REASON.key().equals(key)
+                    || CoreAttributes.UUID.key().equals(key)) {
                 continue;
             }
             newAttributes.put(key, value);
@@ -2591,10 +2601,10 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         newAttributes.put(CoreAttributes.UUID.key(), 
UUID.randomUUID().toString());
 
         final FlowFileRecord fFile = new 
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
-            .addAttributes(newAttributes)
-            .lineageIdentifiers(lineageIdentifiers)
-            .lineageStartDate(lineageStartDate)
-            .build();
+                .addAttributes(newAttributes)
+                .lineageIdentifiers(lineageIdentifiers)
+                .lineageStartDate(lineageStartDate)
+                .build();
 
         final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
         record.setWorking(fFile, newAttributes);
@@ -2606,7 +2616,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     /**
-     * Returns the attributes that are common to every FlowFile given. The key 
and value must match exactly.
+     * Returns the attributes that are common to every FlowFile given. The key
+     * and value must match exactly.
      *
      * @param flowFileList a list of FlowFiles
      *
@@ -2628,18 +2639,18 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
         final Map<String, String> firstMap = 
flowFileList.iterator().next().getAttributes();
 
         outer:
-            for (final Map.Entry<String, String> mapEntry : 
firstMap.entrySet()) {
-                final String key = mapEntry.getKey();
-                final String value = mapEntry.getValue();
-                for (final FlowFile flowFile : flowFileList) {
-                    final Map<String, String> currMap = 
flowFile.getAttributes();
-                    final String curVal = currMap.get(key);
-                    if (curVal == null || !curVal.equals(value)) {
-                        continue outer;
-                    }
+        for (final Map.Entry<String, String> mapEntry : firstMap.entrySet()) {
+            final String key = mapEntry.getKey();
+            final String value = mapEntry.getValue();
+            for (final FlowFile flowFile : flowFileList) {
+                final Map<String, String> currMap = flowFile.getAttributes();
+                final String curVal = currMap.get(key);
+                if (curVal == null || !curVal.equals(value)) {
+                    continue outer;
                 }
-                result.put(key, value);
             }
+            result.put(key, value);
+        }
 
         return result;
     }
@@ -2661,7 +2672,8 @@ public final class StandardProcessSession implements 
ProcessSession, ProvenanceE
     }
 
     /**
-     * Callback interface used to poll a FlowFileQueue, in order to perform 
functional programming-type of polling a queue
+     * Callback interface used to poll a FlowFileQueue, in order to perform
+     * functional programming-type of polling a queue
      */
     private static interface QueuePoller {
 

Reply via email to