[
https://issues.apache.org/jira/browse/NIFI-9056?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17430622#comment-17430622
]
Bruno Gante commented on NIFI-9056:
-----------------------------------
Hi [~aheys], i am facing the same problem in one of our custo processors. Would
you mind to share how did you exactly fixed your problem?
I have the pattern .clone() followed by .write() as mentioned above, partial
code below:
{code:java}
public void onTrigger(final ProcessContext context, final ProcessSession
session) throws ProcessException {
FlowFile flowFile = session.get();
if ( flowFile == null ) {
return;
}
Map<Relationship, FlowFile> relationshipFlowFileMap = new HashMap<>();
Map<PropertyDescriptor, String> properties = context.getProperties();
PropertiesParser propertiesParser = new PropertiesParser();
PdnGwIedrProducer pdnGwIedrProducer =
ProcessFactory.get(properties.get(DataStackProcessor.PDN_GW_IEDR_TYPE),
propertiesParser.get(properties.get(DataStackProcessor.KAFKA_TUNNING)),
properties.get(ENVIRONMENT));
Reader reader =
ParserFactory.get(properties.get(DataStackProcessor.PDN_GW_IEDR_TYPE));
reader.setDelimiter(context.getProperty(DataStackProcessor.DELIMITER).getValue());
PdnGwIedrReader pdnGwIedrReader = new PdnGwIedrReader(reader);
session.read(flowFile,pdnGwIedrReader);
PdnGwIedrKafkaProducer pdnGwIedrKafkaProducer = new
PdnGwIedrKafkaProducer(reader,pdnGwIedrProducer, logger);
PdnGwIedrSuccess pdnGwIedrSuccess = new PdnGwIedrSuccess(new
Difference());
FlowFile original = session.clone(flowFile);
FlowFile failure = session.write(flowFile, in -> {
try {
pdnGwIedrKafkaProducer.process(in, pdnGwIedrReader.getLines());
} catch (Exception e) {
pdnGwIedrProducer.close();
throw new ProcessException("The Kafka producer raised the
following error, " + e);
}
});
FlowFile success = session.write(original, out ->
pdnGwIedrSuccess.process(out, pdnGwIedrReader.getLines(),
pdnGwIedrKafkaProducer.getFailures()));
//relationshipFlowFileMap.put(ORIGINAL, original);
relationshipFlowFileMap.put(SUCCESS, success);
relationshipFlowFileMap.put(FAILURE, failure);
relationshipFlowFileMap
.forEach((relationship, flow)->{
if(flow.getSize() > 0){
session.transfer(flow, relationship);
}else{
session.remove(flow);
}
});
}
{code}
> Content Repository Filling Up
> -----------------------------
>
> Key: NIFI-9056
> URL: https://issues.apache.org/jira/browse/NIFI-9056
> Project: Apache NiFi
> Issue Type: Bug
> Components: Core Framework
> Affects Versions: 1.13.2
> Reporter: Andrew Heys
> Priority: Critical
> Fix For: 1.15.0
>
>
> We have a clustered nifi setup that has recently been upgraded to 1.13.2 from
> 1.11.4. After upgrading, one of the issues we have run into is that the
> Content Repository will fill up to the
> nifi.content.repository.archive.backpressure.percentage mark and lock the
> processing & canvas. The only solution is to restart nifi at this point. We
> have the following properties set:
> nifi.content.repository.archive.backpressure.percentage=95%
> nifi.content.repository.archive.max.usage.percentage=25%
> nifi.content.repository.archive.max.retention.period=2 hours
> The max usage property seems to be completed ignored. Monitoring the nifi
> cluster disk % for content repository shows that it slowly fills up over time
> and never decreasing. If we pause the input to entire nifi flow and let all
> the processing clear out with 0 flowfiles remaining on the canvas for 15+
> minutes, the content repository disk usage does not decrease. Currently, our
> only solution is to restart nifi on a daily cron schedule. After restarting
> the nifi, it will clear out the 80+ GB of the content repository and usage
> falls down to 0%.
>
> There seems to be an issue removing the older content claims in 1.13.2.
> Thanks!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)