This is an automated email from the ASF dual-hosted git repository. joewitt pushed a commit to branch NIFI-6169-RC2 in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 11a9f10f5b67c53d8ab66dbfd04ffdcba681ead7 Author: Andres Garagiola <[email protected]> AuthorDate: Fri Feb 22 17:15:59 2019 -0300 NIFI-5918 Fix issue with MergeRecord when DefragmentStrategy is on This closes #3334. Signed-off-by: Andres Garagiola <[email protected]> Signed-off-by: Koji Kawamura <[email protected]> --- .../nifi/processors/standard/merge/RecordBin.java | 55 +++++++++------------- .../standard/merge/RecordBinManager.java | 10 ++-- .../standard/merge/RecordBinThresholds.java | 10 ++-- 3 files changed, 31 insertions(+), 44 deletions(-) diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java index d15ba0f..a96e119 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBin.java @@ -63,6 +63,7 @@ public class RecordBin { private RecordSetWriter recordWriter; private ByteCountingOutputStream out; private int recordCount = 0; + private int fragmentCount = 0; private volatile boolean complete = false; private static final AtomicLong idGenerator = new AtomicLong(0L); @@ -114,7 +115,7 @@ public class RecordBin { } boolean flowFileMigrated = false; - + this.fragmentCount++; try { if (isComplete()) { logger.debug("RecordBin.offer for id={} returning false because {} is complete", new Object[] {flowFile.getId(), this}); @@ -202,26 +203,7 @@ public class RecordBin { return false; } - int maxRecords; - final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute(); - if (recordCountAttribute.isPresent()) { - final Optional<String> recordCountValue = flowFiles.stream() - .filter(ff -> ff.getAttribute(recordCountAttribute.get()) != null) - .map(ff -> ff.getAttribute(recordCountAttribute.get())) - .findFirst(); - - if (!recordCountValue.isPresent()) { - return false; - } - - try { - maxRecords = Integer.parseInt(recordCountValue.get()); - } catch (final NumberFormatException e) { - maxRecords = 1; - } - } else { - maxRecords = thresholds.getMaxRecords(); - } + int maxRecords = thresholds.getMaxRecords(); if (recordCount >= maxRecords) { return true; @@ -231,6 +213,22 @@ public class RecordBin { return true; } + Optional<String> fragmentCountAttribute = thresholds.getFragmentCountAttribute(); + if(fragmentCountAttribute != null && fragmentCountAttribute.isPresent()) { + final Optional<String> fragmentCountValue = flowFiles.stream() + .filter(ff -> ff.getAttribute(fragmentCountAttribute.get()) != null) + .map(ff -> ff.getAttribute(fragmentCountAttribute.get())) + .findFirst(); + if (fragmentCountValue.isPresent()) { + try { + int expectedFragments = Integer.parseInt(fragmentCountValue.get()); + if (this.fragmentCount == expectedFragments) + return true; + } catch (NumberFormatException nfe) { + this.logger.error(nfe.getMessage(), nfe); + } + } + } return false; } finally { readLock.unlock(); @@ -243,18 +241,7 @@ public class RecordBin { return currentCount; } - int requiredCount; - final Optional<String> recordCountAttribute = thresholds.getRecordCountAttribute(); - if (recordCountAttribute.isPresent()) { - final String recordCountValue = flowFiles.get(0).getAttribute(recordCountAttribute.get()); - try { - requiredCount = Integer.parseInt(recordCountValue); - } catch (final NumberFormatException e) { - requiredCount = 1; - } - } else { - requiredCount = thresholds.getMinRecords(); - } + int requiredCount = thresholds.getMinRecords(); this.requiredRecordCount = requiredCount; return requiredCount; @@ -347,7 +334,7 @@ public class RecordBin { } // If using defragment mode, and we don't have enough FlowFiles, then we need to fail this bin. - final Optional<String> countAttr = thresholds.getRecordCountAttribute(); + final Optional<String> countAttr = thresholds.getFragmentCountAttribute(); if (countAttr.isPresent()) { // Ensure that at least one FlowFile has a fragment.count attribute and that they all have the same value, if they have a value. Integer expectedBinCount = null; diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java index d1dde2a..f0891a9 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinManager.java @@ -189,17 +189,17 @@ public class RecordBinManager { final PropertyValue maxMillisValue = context.getProperty(MergeRecord.MAX_BIN_AGE); final String maxBinAge = maxMillisValue.getValue(); - final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS).longValue() : Long.MAX_VALUE; + final long maxBinMillis = maxMillisValue.isSet() ? maxMillisValue.asTimePeriod(TimeUnit.MILLISECONDS) : Long.MAX_VALUE; - final String recordCountAttribute; + final String fragmentCountAttribute; final String mergeStrategy = context.getProperty(MergeRecord.MERGE_STRATEGY).getValue(); if (MergeRecord.MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) { - recordCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE; + fragmentCountAttribute = MergeContent.FRAGMENT_COUNT_ATTRIBUTE; } else { - recordCountAttribute = null; + fragmentCountAttribute = null; } - return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, recordCountAttribute); + return new RecordBinThresholds(minRecords, maxRecords, minBytes, maxBytes, maxBinMillis, maxBinAge, fragmentCountAttribute); } diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java index 16c05ac..8f1a5c8 100644 --- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java +++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/merge/RecordBinThresholds.java @@ -26,17 +26,17 @@ public class RecordBinThresholds { private final long maxBytes; private final long maxBinMillis; private final String maxBinAge; - private final Optional<String> recordCountAttribute; + private final Optional<String> fragmentCountAttribute; public RecordBinThresholds(final int minRecords, final int maxRecords, final long minBytes, final long maxBytes, final long maxBinMillis, - final String maxBinAge, final String recordCountAttribute) { + final String maxBinAge, final String fragmentCountAttribute) { this.minRecords = minRecords; this.maxRecords = maxRecords; this.minBytes = minBytes; this.maxBytes = maxBytes; this.maxBinMillis = maxBinMillis; this.maxBinAge = maxBinAge; - this.recordCountAttribute = Optional.ofNullable(recordCountAttribute); + this.fragmentCountAttribute = Optional.ofNullable(fragmentCountAttribute); } public int getMinRecords() { @@ -63,7 +63,7 @@ public class RecordBinThresholds { return maxBinAge; } - public Optional<String> getRecordCountAttribute() { - return recordCountAttribute; + public Optional<String> getFragmentCountAttribute() { + return fragmentCountAttribute; } }
