This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 8382b0a7a3 NIFI-11540 Removed legacy segment attributes from
MergeContent
8382b0a7a3 is described below
commit 8382b0a7a3a2735e9047db660df4033c9976c911
Author: Jeyassri Balchandran <[email protected]>
AuthorDate: Fri May 26 19:13:46 2023 +0530
NIFI-11540 Removed legacy segment attributes from MergeContent
This closes #7305
Co-authored-by: David Handermann <[email protected]>
Signed-off-by: David Handermann <[email protected]>
---
.../nifi/processors/standard/MergeContent.java | 23 ++--------------------
.../nifi/processors/standard/SegmentContent.java | 20 -------------------
.../nifi/processors/standard/TestMergeContent.java | 12 +++++------
3 files changed, 8 insertions(+), 47 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
index c0ce6ade8a..5b4afdca81 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java
@@ -154,14 +154,8 @@ public class MergeContent extends BinFiles {
public static final String FRAGMENT_ID_ATTRIBUTE =
FragmentAttributes.FRAGMENT_ID.key();
public static final String FRAGMENT_INDEX_ATTRIBUTE =
FragmentAttributes.FRAGMENT_INDEX.key();
public static final String FRAGMENT_COUNT_ATTRIBUTE =
FragmentAttributes.FRAGMENT_COUNT.key();
-
- // old style attributes
- public static final String SEGMENT_ID_ATTRIBUTE = "segment.identifier";
- public static final String SEGMENT_INDEX_ATTRIBUTE = "segment.index";
- public static final String SEGMENT_COUNT_ATTRIBUTE = "segment.count";
public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
-
public static final AllowableValue METADATA_STRATEGY_USE_FIRST = new
AllowableValue("Use First Metadata", "Use First Metadata",
"For any input format that supports metadata (Avro, e.g.), the
metadata for the first FlowFile in the bin will be set on the output
FlowFile.");
@@ -184,8 +178,7 @@ public class MergeContent extends BinFiles {
"Defragment",
"Defragment",
"Combines fragments that are associated by attributes back into a
single cohesive FlowFile. If using this strategy, all FlowFiles must "
- + "have the attributes <fragment.identifier>, <fragment.count>,
and <fragment.index> or alternatively (for backward compatibility "
- + "purposes) <segment.identifier>, <segment.count>, and
<segment.index>. All FlowFiles with the same value for \"fragment.identifier\" "
+ + "have the attributes <fragment.identifier>, <fragment.count>,
and <fragment.index>. All FlowFiles with the same value for
\"fragment.identifier\" "
+ "will be grouped together. All FlowFiles in this group must have
the same value for the \"fragment.count\" attribute. All FlowFiles "
+ "in this group must have a unique value for the
\"fragment.index\" attribute between 0 and the value of the \"fragment.count\"
attribute.");
@@ -430,19 +423,7 @@ public class MergeContent extends BinFiles {
@Override
protected FlowFile preprocessFlowFile(final ProcessContext context, final
ProcessSession session, final FlowFile flowFile) {
- FlowFile processed = flowFile;
- // handle backward compatibility with old segment attributes
- if (processed.getAttribute(FRAGMENT_COUNT_ATTRIBUTE) == null &&
processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE) != null) {
- processed = session.putAttribute(processed,
FRAGMENT_COUNT_ATTRIBUTE, processed.getAttribute(SEGMENT_COUNT_ATTRIBUTE));
- }
- if (processed.getAttribute(FRAGMENT_INDEX_ATTRIBUTE) == null &&
processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE) != null) {
- processed = session.putAttribute(processed,
FRAGMENT_INDEX_ATTRIBUTE, processed.getAttribute(SEGMENT_INDEX_ATTRIBUTE));
- }
- if (processed.getAttribute(FRAGMENT_ID_ATTRIBUTE) == null &&
processed.getAttribute(SEGMENT_ID_ATTRIBUTE) != null) {
- processed = session.putAttribute(processed, FRAGMENT_ID_ATTRIBUTE,
processed.getAttribute(SEGMENT_ID_ATTRIBUTE));
- }
-
- return processed;
+ return flowFile;
}
@Override
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
index d29bff812d..8dba735ce1 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SegmentContent.java
@@ -57,17 +57,6 @@ import org.apache.nifi.processor.util.StandardValidators;
+ "fragment.identifier, fragment.index, fragment.count,
segment.original.filename; these attributes can then be used by the "
+ "MergeContent processor in order to reconstitute the original
FlowFile")
@WritesAttributes({
- @WritesAttribute(attribute = "segment.identifier",
- description = "All segments produced from the same parent FlowFile
will have the same randomly generated UUID added for this "
- + "attribute. This attribute is added to maintain backward
compatibility, but the fragment.identifier is preferred, as "
- + "it is designed to work in conjunction with the MergeContent
Processor"),
- @WritesAttribute(attribute = "segment.index",
- description = "A one-up number that indicates the ordering of the
segments that were created from a single parent FlowFile. "
- + "This attribute is added to maintain backward compatibility, but
the fragment.index is preferred, as it is designed "
- + "to work in conjunction with the MergeContent Processor"),
- @WritesAttribute(attribute = "segment.count",
- description = "The number of segments generated from the parent
FlowFile. This attribute is added to maintain backward compatibility, "
- + "but the fragment.count is preferred, as it is designed to work
in conjunction with the MergeContent Processor"),
@WritesAttribute(attribute = "fragment.identifier",
description = "All segments produced from the same parent FlowFile
will have the same randomly generated UUID added for this attribute"),
@WritesAttribute(attribute = "fragment.index",
@@ -79,9 +68,6 @@ import org.apache.nifi.processor.util.StandardValidators;
@SeeAlso(MergeContent.class)
public class SegmentContent extends AbstractProcessor {
- public static final String SEGMENT_ID = "segment.identifier";
- public static final String SEGMENT_INDEX = "segment.index";
- public static final String SEGMENT_COUNT = "segment.count";
public static final String SEGMENT_ORIGINAL_FILENAME =
FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
public static final String FRAGMENT_ID =
FragmentAttributes.FRAGMENT_ID.key();
@@ -143,9 +129,6 @@ public class SegmentContent extends AbstractProcessor {
final String originalFileName =
flowFile.getAttribute(CoreAttributes.FILENAME.key());
if (flowFile.getSize() <= segmentSize) {
- flowFile = session.putAttribute(flowFile, SEGMENT_ID, segmentId);
- flowFile = session.putAttribute(flowFile, SEGMENT_INDEX, "1");
- flowFile = session.putAttribute(flowFile, SEGMENT_COUNT, "1");
flowFile = session.putAttribute(flowFile,
SEGMENT_ORIGINAL_FILENAME, originalFileName);
flowFile = session.putAttribute(flowFile, FRAGMENT_ID, segmentId);
@@ -164,8 +147,6 @@ public class SegmentContent extends AbstractProcessor {
}
final Map<String, String> segmentAttributes = new HashMap<>();
- segmentAttributes.put(SEGMENT_ID, segmentId);
- segmentAttributes.put(SEGMENT_COUNT, String.valueOf(totalSegments));
segmentAttributes.put(SEGMENT_ORIGINAL_FILENAME, originalFileName);
segmentAttributes.put(FRAGMENT_ID, segmentId);
@@ -175,7 +156,6 @@ public class SegmentContent extends AbstractProcessor {
for (int i = 1; i <= totalSegments; i++) {
final long segmentOffset = segmentSize * (i - 1);
FlowFile segment = session.clone(flowFile, segmentOffset,
Math.min(segmentSize, flowFile.getSize() - segmentOffset));
- segmentAttributes.put(SEGMENT_INDEX, String.valueOf(i));
segmentAttributes.put(FRAGMENT_INDEX, String.valueOf(i));
segment = session.putAllAttributes(segment, segmentAttributes);
segmentSet.add(segment);
diff --git
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
index 4b2984b6b4..a77f054215 100644
---
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
+++
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestMergeContent.java
@@ -1129,17 +1129,17 @@ public class TestMergeContent {
runner.setProperty(MergeContent.MAX_BIN_AGE, "1 min");
final Map<String, String> attributes = new HashMap<>();
- attributes.put("segment.identifier", "1");
- attributes.put("segment.count", "4");
- attributes.put("segment.index", "1");
+ attributes.put("fragment.identifier", "1");
+ attributes.put("fragment.count", "4");
+ attributes.put("fragment.index", "1");
attributes.put("segment.original.filename", "originalfilename");
runner.enqueue("A Man ".getBytes("UTF-8"), attributes);
- attributes.put("segment.index", "2");
+ attributes.put("fragment.index", "2");
runner.enqueue("A Plan ".getBytes("UTF-8"), attributes);
- attributes.put("segment.index", "3");
+ attributes.put("fragment.index", "3");
runner.enqueue("A Canal ".getBytes("UTF-8"), attributes);
- attributes.put("segment.index", "4");
+ attributes.put("fragment.index", "4");
runner.enqueue("Panama".getBytes("UTF-8"), attributes);
runner.run();