ijokarumawak commented on a change in pull request #3438: NIFI-6220: If
FlowFile is created by cloning a relationship, do not c…
URL: https://github.com/apache/nifi/pull/3438#discussion_r276913052
##########
File path:
nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
##########
@@ -1677,6 +1688,97 @@ public FlowFile create() {
return fFile;
}
+ @Override
+ public FlowFile create(FlowFile parent) {
+ verifyTaskActive();
+ parent = getMostRecent(parent);
+
+ final String uuid = UUID.randomUUID().toString();
+
+ final Map<String, String> newAttributes = new HashMap<>(3);
+ newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
+ newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+ newAttributes.put(CoreAttributes.UUID.key(), uuid);
+
+ final StandardFlowFileRecord.Builder fFileBuilder = new
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence());
+
+ // copy all attributes from parent except for the "special"
attributes. Copying the special attributes
+ // can cause problems -- especially the ALTERNATE_IDENTIFIER, because
copying can cause Provenance Events
+ // to be incorrectly created.
+ for (final Map.Entry<String, String> entry :
parent.getAttributes().entrySet()) {
+ final String key = entry.getKey();
+ final String value = entry.getValue();
+ if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
+ || CoreAttributes.DISCARD_REASON.key().equals(key)
+ || CoreAttributes.UUID.key().equals(key)) {
+ continue;
+ }
+ newAttributes.put(key, value);
+ }
+
+ fFileBuilder.lineageStart(parent.getLineageStartDate(),
parent.getLineageStartIndex());
+ fFileBuilder.addAttributes(newAttributes);
+
+ final FlowFileRecord fFile = fFileBuilder.build();
+ final StandardRepositoryRecord record = new
StandardRepositoryRecord(null);
+ record.setWorking(fFile, newAttributes);
+ records.put(fFile.getId(), record);
+ createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+
+ registerForkEvent(parent, fFile);
+ return fFile;
+ }
+
+ @Override
+ public FlowFile create(Collection<FlowFile> parents) {
+ verifyTaskActive();
+
+ parents =
parents.stream().map(this::getMostRecent).collect(Collectors.toList());
+
+ final Map<String, String> newAttributes = intersectAttributes(parents);
+ newAttributes.remove(CoreAttributes.UUID.key());
+ newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());
+ newAttributes.remove(CoreAttributes.DISCARD_REASON.key());
+
+ // When creating a new FlowFile from multiple parents, we need to add
all of the Lineage Identifiers
+ // and use the earliest lineage start date
+ long lineageStartDate = 0L;
+ for (final FlowFile parent : parents) {
+
+ final long parentLineageStartDate = parent.getLineageStartDate();
+ if (lineageStartDate == 0L || parentLineageStartDate <
lineageStartDate) {
+ lineageStartDate = parentLineageStartDate;
+ }
+ }
+
+ // find the smallest lineage start index that has the same lineage
start date as the one we've chosen.
+ long lineageStartIndex = 0L;
+ for (final FlowFile parent : parents) {
+ if (parent.getLineageStartDate() == lineageStartDate &&
parent.getLineageStartIndex() < lineageStartIndex) {
+ lineageStartIndex = parent.getLineageStartIndex();
+ }
+ }
+
+ final String uuid = UUID.randomUUID().toString();
+ newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
+ newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+ newAttributes.put(CoreAttributes.UUID.key(), uuid);
+
+ final FlowFileRecord fFile = new
StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
+ .addAttributes(newAttributes)
+ .lineageStart(lineageStartDate, lineageStartIndex)
+ .build();
+
+ final StandardRepositoryRecord record = new
StandardRepositoryRecord(null);
+ record.setWorking(fFile, newAttributes);
+ records.put(fFile.getId(), record);
+ createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+
+ registerJoinEvent(fFile, parents);
+ return fFile;
+ }
+
+
Review comment:
Contents of these methods are not changed, are they? If so, it'd have been
easy for reviewers to look changed lines.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services