markap14 commented on a change in pull request #5550:
URL: https://github.com/apache/nifi/pull/5550#discussion_r756140343
##########
File path:
nifi-mock/src/main/java/org/apache/nifi/util/StandardProcessorTestRunner.java
##########
@@ -221,28 +221,18 @@ public void run(final int iterations, final boolean
stopOnFinish, final boolean
} catch (final InterruptedException e1) {
}
- int finishedCount = 0;
- boolean unscheduledRun = false;
for (final Future<Throwable> future : futures) {
try {
final Throwable thrown = future.get(); // wait for the
result
if (thrown != null) {
throw new AssertionError(thrown);
}
-
- if (++finishedCount == 1) {
- unscheduledRun = true;
- unSchedule();
- }
} catch (final Exception e) {
}
}
- if (!unscheduledRun) {
- unSchedule();
- }
-
if (stopOnFinish) {
+ unSchedule();
Review comment:
I'm not sure that I agree with this change. It is much simpler, but it
changes the semantics a bit of how this runs. If the TestRunner is configured
to use more than one thread and use multiple iterations, the old way would call
`@OnUnscheduled` methods after the first thread completes, while other threads
are potentially still running. This change makes `@OnUnscheduled` and
`@OnStopped` methods functionally the same in the testing framework, while they
are very different within NiFi. It's a fairly common mistake to see developers
use `@OnUnscheduled` when they should use `@OnStopped`. So the code as-is would
potentially highlight such issues in unit tests, while the change prevents it
from catching this mistakes. I do see the need to check the `stopOnFinish` flag
but perhaps it makes more sense to just update the `if` conditional to `if
(++finishedCount == 1 && stopOnFinish) {`?
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final
ProcessSessionFactory
}
}
- final ProcessSession session = sessionFactory.createSession();
- final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
- if (getLogger().isDebugEnabled()) {
- final List<String> ids = flowFiles.stream().map(ff -> "id=" +
ff.getId()).collect(Collectors.toList());
- getLogger().debug("Pulled {} FlowFiles from queue: {}", new
Object[] {ids.size(), ids});
- }
+ while (isScheduled()) {
+ final ProcessSession session = sessionFactory.createSession();
+ final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+ if (flowFiles.isEmpty()) {
+ break;
+ }
+ if (getLogger().isDebugEnabled()) {
+ final List<String> ids = flowFiles.stream().map(ff -> "id=" +
ff.getId()).collect(Collectors.toList());
+ getLogger().debug("Pulled {} FlowFiles from queue: {}",
ids.size(), ids);
+ }
- final String mergeStrategy =
context.getProperty(MERGE_STRATEGY).getValue();
- final boolean block;
- if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
- block = true;
- } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
- block = true;
- } else {
- block = false;
- }
+ final String mergeStrategy =
context.getProperty(MERGE_STRATEGY).getValue();
+ final boolean block;
+ if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+ block = true;
+ } else if
(context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+ block = true;
+ } else {
+ block = false;
+ }
- try {
- for (final FlowFile flowFile : flowFiles) {
- try {
- binFlowFile(context, flowFile, session, manager, block);
- } catch (final Exception e) {
- getLogger().error("Failed to bin {} due to {}", new
Object[] {flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
+ try {
+ for (final FlowFile flowFile : flowFiles) {
+ try {
+ binFlowFile(context, flowFile, session, manager,
block);
+ } catch (final Exception e) {
+ getLogger().error("Failed to bin {} due to {}",
flowFile, e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
}
+ } finally {
+ session.commitAsync();
}
- } finally {
- session.commitAsync();
- }
- // If there is no more data queued up, or strategy is defragment,
complete any bin that meets our minimum threshold
- // Otherwise, run one more cycle to process queued FlowFiles to add
more fragment into available bins.
- int completedBins = 0;
- if (flowFiles.isEmpty() ||
MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+ // Complete any bins that have reached their expiration date
try {
- completedBins += manager.completeFullEnoughBins();
+ manager.completeExpiredBins();
} catch (final Exception e) {
- getLogger().error("Failed to merge FlowFiles to create new bin
due to " + e, e);
+ getLogger().error("Failed to merge FlowFiles to create new bin
due to {}", e);
}
}
- // Complete any bins that have reached their expiration date
- try {
- completedBins += manager.completeExpiredBins();
- } catch (final Exception e) {
- getLogger().error("Failed to merge FlowFiles to create new bin due
to " + e, e);
- }
+ if (isScheduled()) {
+ // Complete any bins that have reached their expiration date
+ try {
+ manager.completeExpiredBins();
+ } catch (final Exception e) {
+ getLogger().error("Failed to merge FlowFiles to create new bin
due to {}", e);
Review comment:
Same comment about args to the logger. Need to pass a separate arg to
match the `{}` matcher and another for the stacktrace.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final
ProcessSessionFactory
}
}
- final ProcessSession session = sessionFactory.createSession();
- final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
- if (getLogger().isDebugEnabled()) {
- final List<String> ids = flowFiles.stream().map(ff -> "id=" +
ff.getId()).collect(Collectors.toList());
- getLogger().debug("Pulled {} FlowFiles from queue: {}", new
Object[] {ids.size(), ids});
- }
+ while (isScheduled()) {
+ final ProcessSession session = sessionFactory.createSession();
+ final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+ if (flowFiles.isEmpty()) {
+ break;
+ }
+ if (getLogger().isDebugEnabled()) {
+ final List<String> ids = flowFiles.stream().map(ff -> "id=" +
ff.getId()).collect(Collectors.toList());
+ getLogger().debug("Pulled {} FlowFiles from queue: {}",
ids.size(), ids);
+ }
- final String mergeStrategy =
context.getProperty(MERGE_STRATEGY).getValue();
- final boolean block;
- if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
- block = true;
- } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
- block = true;
- } else {
- block = false;
- }
+ final String mergeStrategy =
context.getProperty(MERGE_STRATEGY).getValue();
+ final boolean block;
+ if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+ block = true;
+ } else if
(context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+ block = true;
+ } else {
+ block = false;
+ }
- try {
- for (final FlowFile flowFile : flowFiles) {
- try {
- binFlowFile(context, flowFile, session, manager, block);
- } catch (final Exception e) {
- getLogger().error("Failed to bin {} due to {}", new
Object[] {flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
+ try {
+ for (final FlowFile flowFile : flowFiles) {
+ try {
+ binFlowFile(context, flowFile, session, manager,
block);
+ } catch (final Exception e) {
+ getLogger().error("Failed to bin {} due to {}",
flowFile, e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
}
+ } finally {
+ session.commitAsync();
}
- } finally {
- session.commitAsync();
- }
- // If there is no more data queued up, or strategy is defragment,
complete any bin that meets our minimum threshold
- // Otherwise, run one more cycle to process queued FlowFiles to add
more fragment into available bins.
- int completedBins = 0;
- if (flowFiles.isEmpty() ||
MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+ // Complete any bins that have reached their expiration date
try {
- completedBins += manager.completeFullEnoughBins();
+ manager.completeExpiredBins();
} catch (final Exception e) {
- getLogger().error("Failed to merge FlowFiles to create new bin
due to " + e, e);
+ getLogger().error("Failed to merge FlowFiles to create new bin
due to {}", e);
Review comment:
I don't think this change is correct. It'll result in a log message that
says `Failed to merge FlowFiles to create new bin due to {}`. Need to either
use the `... due to " + e` or `... due to {}", e, e);` so that the first
argument matches to the `{}` while the second argument is the `Throwable`
argument that causes a stack trace.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final
ProcessSessionFactory
}
}
- final ProcessSession session = sessionFactory.createSession();
- final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
- if (getLogger().isDebugEnabled()) {
- final List<String> ids = flowFiles.stream().map(ff -> "id=" +
ff.getId()).collect(Collectors.toList());
- getLogger().debug("Pulled {} FlowFiles from queue: {}", new
Object[] {ids.size(), ids});
- }
+ while (isScheduled()) {
+ final ProcessSession session = sessionFactory.createSession();
+ final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+ if (flowFiles.isEmpty()) {
+ break;
+ }
+ if (getLogger().isDebugEnabled()) {
+ final List<String> ids = flowFiles.stream().map(ff -> "id=" +
ff.getId()).collect(Collectors.toList());
+ getLogger().debug("Pulled {} FlowFiles from queue: {}",
ids.size(), ids);
+ }
- final String mergeStrategy =
context.getProperty(MERGE_STRATEGY).getValue();
- final boolean block;
- if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
- block = true;
- } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
- block = true;
- } else {
- block = false;
- }
+ final String mergeStrategy =
context.getProperty(MERGE_STRATEGY).getValue();
+ final boolean block;
+ if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+ block = true;
+ } else if
(context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+ block = true;
+ } else {
+ block = false;
+ }
- try {
- for (final FlowFile flowFile : flowFiles) {
- try {
- binFlowFile(context, flowFile, session, manager, block);
- } catch (final Exception e) {
- getLogger().error("Failed to bin {} due to {}", new
Object[] {flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
+ try {
+ for (final FlowFile flowFile : flowFiles) {
+ try {
+ binFlowFile(context, flowFile, session, manager,
block);
+ } catch (final Exception e) {
+ getLogger().error("Failed to bin {} due to {}",
flowFile, e);
+ session.transfer(flowFile, REL_FAILURE);
+ }
}
+ } finally {
+ session.commitAsync();
}
- } finally {
- session.commitAsync();
- }
- // If there is no more data queued up, or strategy is defragment,
complete any bin that meets our minimum threshold
- // Otherwise, run one more cycle to process queued FlowFiles to add
more fragment into available bins.
- int completedBins = 0;
- if (flowFiles.isEmpty() ||
MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+ // Complete any bins that have reached their expiration date
try {
- completedBins += manager.completeFullEnoughBins();
+ manager.completeExpiredBins();
} catch (final Exception e) {
- getLogger().error("Failed to merge FlowFiles to create new bin
due to " + e, e);
+ getLogger().error("Failed to merge FlowFiles to create new bin
due to {}", e);
}
}
- // Complete any bins that have reached their expiration date
- try {
- completedBins += manager.completeExpiredBins();
- } catch (final Exception e) {
- getLogger().error("Failed to merge FlowFiles to create new bin due
to " + e, e);
- }
+ if (isScheduled()) {
+ // Complete any bins that have reached their expiration date
+ try {
+ manager.completeExpiredBins();
+ } catch (final Exception e) {
+ getLogger().error("Failed to merge FlowFiles to create new bin
due to {}", e);
+ }
+
+ // Complete any bins that meet their minimum size requirements
+ try {
+ manager.completeFullEnoughBins();
+ } catch (final Exception e) {
+ getLogger().error("Failed to merge FlowFiles to create new bin
due to {}", e);
Review comment:
Same comment about args to the logger. Need to pass a separate arg to
match the `{}` matcher and another for the stacktrace.
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeRecord.java
##########
@@ -323,56 +323,64 @@ public void onTrigger(final ProcessContext context, final
ProcessSessionFactory
}
}
- final ProcessSession session = sessionFactory.createSession();
- final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
- if (getLogger().isDebugEnabled()) {
- final List<String> ids = flowFiles.stream().map(ff -> "id=" +
ff.getId()).collect(Collectors.toList());
- getLogger().debug("Pulled {} FlowFiles from queue: {}", new
Object[] {ids.size(), ids});
- }
+ while (isScheduled()) {
+ final ProcessSession session = sessionFactory.createSession();
+ final List<FlowFile> flowFiles =
session.get(FlowFileFilters.newSizeBasedFilter(250, DataUnit.KB, 250));
+ if (flowFiles.isEmpty()) {
+ break;
+ }
+ if (getLogger().isDebugEnabled()) {
+ final List<String> ids = flowFiles.stream().map(ff -> "id=" +
ff.getId()).collect(Collectors.toList());
+ getLogger().debug("Pulled {} FlowFiles from queue: {}",
ids.size(), ids);
+ }
- final String mergeStrategy =
context.getProperty(MERGE_STRATEGY).getValue();
- final boolean block;
- if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
- block = true;
- } else if (context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
- block = true;
- } else {
- block = false;
- }
+ final String mergeStrategy =
context.getProperty(MERGE_STRATEGY).getValue();
+ final boolean block;
+ if (MERGE_STRATEGY_DEFRAGMENT.getValue().equals(mergeStrategy)) {
+ block = true;
+ } else if
(context.getProperty(CORRELATION_ATTRIBUTE_NAME).isSet()) {
+ block = true;
+ } else {
+ block = false;
+ }
- try {
- for (final FlowFile flowFile : flowFiles) {
- try {
- binFlowFile(context, flowFile, session, manager, block);
- } catch (final Exception e) {
- getLogger().error("Failed to bin {} due to {}", new
Object[] {flowFile, e});
- session.transfer(flowFile, REL_FAILURE);
+ try {
+ for (final FlowFile flowFile : flowFiles) {
+ try {
+ binFlowFile(context, flowFile, session, manager,
block);
+ } catch (final Exception e) {
+ getLogger().error("Failed to bin {} due to {}",
flowFile, e);
Review comment:
Need to pass a separate arg to match the `{}` matcher and another for
the stacktrace.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]