ijokarumawak commented on a change in pull request #3438: NIFI-6220: If 
FlowFile is created by cloning a relationship, do not c…
URL: https://github.com/apache/nifi/pull/3438#discussion_r276913052
 
 

 ##########
 File path: 
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 ##########
 @@ -1677,6 +1688,97 @@ public FlowFile create() {
         return fFile;
     }
 
+    @Override
+    public FlowFile create(FlowFile parent) {
+        verifyTaskActive();
+        parent = getMostRecent(parent);
+
+        final String uuid = UUID.randomUUID().toString();
+
+        final Map<String, String> newAttributes = new HashMap<>(3);
+        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
+        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+        newAttributes.put(CoreAttributes.UUID.key(), uuid);
+
+        final StandardFlowFileRecord.Builder fFileBuilder = new 
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence());
+
+        // copy all attributes from parent except for the "special" 
attributes. Copying the special attributes
+        // can cause problems -- especially the ALTERNATE_IDENTIFIER, because 
copying can cause Provenance Events
+        // to be incorrectly created.
+        for (final Map.Entry<String, String> entry : 
parent.getAttributes().entrySet()) {
+            final String key = entry.getKey();
+            final String value = entry.getValue();
+            if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
+                || CoreAttributes.DISCARD_REASON.key().equals(key)
+                || CoreAttributes.UUID.key().equals(key)) {
+                continue;
+            }
+            newAttributes.put(key, value);
+        }
+
+        fFileBuilder.lineageStart(parent.getLineageStartDate(), 
parent.getLineageStartIndex());
+        fFileBuilder.addAttributes(newAttributes);
+
+        final FlowFileRecord fFile = fFileBuilder.build();
+        final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
+        record.setWorking(fFile, newAttributes);
+        records.put(fFile.getId(), record);
+        createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+
+        registerForkEvent(parent, fFile);
+        return fFile;
+    }
+
+    @Override
+    public FlowFile create(Collection<FlowFile> parents) {
+        verifyTaskActive();
+
+        parents = 
parents.stream().map(this::getMostRecent).collect(Collectors.toList());
+
+        final Map<String, String> newAttributes = intersectAttributes(parents);
+        newAttributes.remove(CoreAttributes.UUID.key());
+        newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());
+        newAttributes.remove(CoreAttributes.DISCARD_REASON.key());
+
+        // When creating a new FlowFile from multiple parents, we need to add 
all of the Lineage Identifiers
+        // and use the earliest lineage start date
+        long lineageStartDate = 0L;
+        for (final FlowFile parent : parents) {
+
+            final long parentLineageStartDate = parent.getLineageStartDate();
+            if (lineageStartDate == 0L || parentLineageStartDate < 
lineageStartDate) {
+                lineageStartDate = parentLineageStartDate;
+            }
+        }
+
+        // find the smallest lineage start index that has the same lineage 
start date as the one we've chosen.
+        long lineageStartIndex = 0L;
+        for (final FlowFile parent : parents) {
+            if (parent.getLineageStartDate() == lineageStartDate && 
parent.getLineageStartIndex() < lineageStartIndex) {
+                lineageStartIndex = parent.getLineageStartIndex();
+            }
+        }
+
+        final String uuid = UUID.randomUUID().toString();
+        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
+        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+        newAttributes.put(CoreAttributes.UUID.key(), uuid);
+
+        final FlowFileRecord fFile = new 
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
+            .addAttributes(newAttributes)
+            .lineageStart(lineageStartDate, lineageStartIndex)
+            .build();
+
+        final StandardRepositoryRecord record = new 
StandardRepositoryRecord(null);
+        record.setWorking(fFile, newAttributes);
+        records.put(fFile.getId(), record);
+        createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+
+        registerJoinEvent(fFile, parents);
+        return fFile;
+    }
+
+
 
 Review comment:
   Contents of these methods are not changed, are they? If so, it'd have been 
easy for reviewers to look changed lines.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to