Repository: incubator-nifi Updated Branches: refs/heads/nifi-27 2f2474efb -> 5b0d8a5dc
NIFI-12: Remove Processors even if their @OnRemoved methods throw Exceptions Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/cb2e855f Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/cb2e855f Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/cb2e855f Branch: refs/heads/nifi-27 Commit: cb2e855fc7c42536887b055baadf93683d764a47 Parents: f60a97b Author: Mark Payne <[email protected]> Authored: Wed Dec 10 12:49:31 2014 -0500 Committer: Mark Payne <[email protected]> Committed: Wed Dec 10 12:49:31 2014 -0500 ---------------------------------------------------------------------- .../nifi/groups/StandardProcessGroup.java | 4 ++-- .../nifi/processors/standard/MergeContent.java | 22 +++++++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb2e855f/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java ---------------------------------------------------------------------- diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java index 8aafb58..1064536 100644 --- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java +++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/groups/StandardProcessGroup.java @@ -46,6 +46,7 @@ import org.apache.nifi.controller.label.Label; import org.apache.nifi.controller.service.ControllerServiceProvider; import org.apache.nifi.logging.LogRepositoryFactory; import org.apache.nifi.nar.NarCloseable; +import org.apache.nifi.processor.SimpleProcessLogger; import org.apache.nifi.processor.StandardProcessContext; import org.apache.nifi.processor.annotation.OnRemoved; import org.apache.nifi.processor.annotation.OnShutdown; @@ -53,7 +54,6 @@ import org.apache.nifi.remote.RemoteGroupPort; import org.apache.nifi.remote.RootGroupPort; import org.apache.nifi.util.NiFiProperties; import org.apache.nifi.util.ReflectionUtils; - import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.commons.lang3.builder.ToStringBuilder; @@ -664,7 +664,7 @@ public final class StandardProcessGroup implements ProcessGroup { try (final NarCloseable x = NarCloseable.withNarLoader()) { final StandardProcessContext processContext = new StandardProcessContext(processor, controllerServiceProvider, encryptor); - ReflectionUtils.invokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); + ReflectionUtils.quietlyInvokeMethodsWithAnnotation(OnRemoved.class, processor.getProcessor(), processContext); } catch (final Exception e) { throw new ProcessorLifeCycleException("Failed to invoke 'OnRemoved' methods of " + processor, e); } http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cb2e855f/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java ---------------------------------------------------------------------- diff --git a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java index d443e00..9a932f0 100644 --- a/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java +++ b/nar-bundles/standard-bundle/standard-processors/src/main/java/org/apache/nifi/processors/standard/MergeContent.java @@ -76,7 +76,6 @@ import org.apache.nifi.util.FlowFilePackagerV1; import org.apache.nifi.util.FlowFilePackagerV2; import org.apache.nifi.util.FlowFilePackagerV3; import org.apache.nifi.util.ObjectHolder; - import org.apache.commons.compress.archivers.tar.TarArchiveEntry; import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream; @@ -317,6 +316,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor { return Files.readAllBytes(Paths.get(filename)); } + @Override public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException { int binsAdded = binFlowFiles(context, sessionFactory); @@ -331,6 +331,7 @@ public class MergeContent extends AbstractSessionFactoryProcessor { context.yield(); } } + private int migrateBins(final ProcessContext context) { int added = 0; @@ -548,20 +549,27 @@ public class MergeContent extends AbstractSessionFactoryProcessor { public void onScheduled(final ProcessContext context) throws IOException { binManager.setMinimumSize(context.getProperty(MIN_SIZE).asDataSize(DataUnit.B).longValue()); - if (context.getProperty(MAX_BIN_AGE).getValue() != null) { + if (context.getProperty(MAX_BIN_AGE).isSet() ) { binManager.setMaxBinAge(context.getProperty(MAX_BIN_AGE).asTimePeriod(TimeUnit.SECONDS).intValue()); + } else { + binManager.setMaxBinAge(Integer.MAX_VALUE); } - - if (context.getProperty(MAX_SIZE).getValue() != null) { + + if ( context.getProperty(MAX_SIZE).isSet() ) { binManager.setMaximumSize(context.getProperty(MAX_SIZE).asDataSize(DataUnit.B).longValue()); + } else { + binManager.setMaximumSize(Long.MAX_VALUE); } - + if (MERGE_STRATEGY_DEFRAGMENT.equals(context.getProperty(MERGE_STRATEGY).getValue())) { binManager.setFileCountAttribute(FRAGMENT_COUNT_ATTRIBUTE); } else { binManager.setMinimumEntries(context.getProperty(MIN_ENTRIES).asInteger()); - if (context.getProperty(MAX_ENTRIES).getValue() != null) { - binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger()); + + if ( context.getProperty(MAX_ENTRIES).isSet() ) { + binManager.setMaximumEntries(context.getProperty(MAX_ENTRIES).asInteger().intValue()); + } else { + binManager.setMaximumEntries(Integer.MAX_VALUE); } }
