[
https://issues.apache.org/jira/browse/NIFI-14367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17935541#comment-17935541
]
Jordan Sammut edited comment on NIFI-14367 at 3/14/25 3:08 PM:
---------------------------------------------------------------
Did some more investigate and this is where the issue lies:
{code:java}
@Override
public RecordSchema getSchema(final Map<String, String> variables, final
InputStream contentStream, final RecordSchema readSchema) throws IOException {
// We expect to be able to mark/reset any length because we expect that the
underlying stream here will be a ContentClaimInputStream, which is able to
// re-read the content regardless of how much data is read.
contentStream.mark(1_000_000);
try {
final RecordSource<T> recordSource =
recordSourceFactory.create(variables, new
NonCloseableInputStream(contentStream));
final RecordSchema schema = schemaInference.inferSchema(recordSource);
logger.debug("Successfully inferred schema {}", schema);
return schema;
} finally {
contentStream.reset();
}
} {code}
In this case, the InputStream does not seem to be of type **
{*}ContentClaimInputStream{*}, thus invalidating the mark when going above
1000000 (1MB). Would appreciate some aid here on how this can be resolved
!image-2025-03-14-15-43-41-942.png|width=604,height=256!
The Input stream is being processed into a BufferedInputStream in
*PublishKafka.java* class, in the *process* function:
{code:java}
public void process(final InputStream in) {
try (final InputStream is = new BufferedInputStream(in)) {
final Iterator<KafkaRecord> records =
kafkaConverter.convert(attributes, is, inputLength);
producerService.send(records, publishContext);
} catch (final Exception e) {
publishContext.setException(e); // on data pre-process failure,
indicate this to controller service
producerService.send(Collections.emptyIterator(), publishContext);
}
}{code}
was (Author: JIRAUSER308826):
Did some more investigate and this is where the issue lies:
{code:java}
@Override
public RecordSchema getSchema(final Map<String, String> variables, final
InputStream contentStream, final RecordSchema readSchema) throws IOException {
// We expect to be able to mark/reset any length because we expect that the
underlying stream here will be a ContentClaimInputStream, which is able to
// re-read the content regardless of how much data is read.
contentStream.mark(1_000_000);
try {
final RecordSource<T> recordSource =
recordSourceFactory.create(variables, new
NonCloseableInputStream(contentStream));
final RecordSchema schema = schemaInference.inferSchema(recordSource);
logger.debug("Successfully inferred schema {}", schema);
return schema;
} finally {
contentStream.reset();
}
} {code}
In this case, the InputStream does not seem to be of type **
{*}ContentClaimInputStream{*}, thus invalidating the mark when going above
1000000 (1MB). Would appreciate some aid here on how this can be resolved
!image-2025-03-14-15-43-41-942.png|width=604,height=256!
> Kafka Publisher fails when NDJson content is larger than 1MB
> ------------------------------------------------------------
>
> Key: NIFI-14367
> URL: https://issues.apache.org/jira/browse/NIFI-14367
> Project: Apache NiFi
> Issue Type: Bug
> Affects Versions: 2.3.0
> Environment: Docker image running NiFi 2.3.0
> Reporter: Jordan Sammut
> Priority: Blocker
> Fix For: 2.4.0
>
> Attachments: image-2025-03-13-18-06-00-557.png,
> image-2025-03-13-18-06-37-150.png, image-2025-03-13-18-07-36-620.png,
> image-2025-03-14-15-43-41-942.png
>
>
> The Kafka Publisher seemingly fails with no error message when passing NDJson
> which is larger than 1MB. There seems to be a limit somewhere. I have
> attempted to debug the code myself and it looks like the following error is
> returned on this line with the method **
> {*}Iterator<KafkaRecord> convert{*}:
> {code:java}
> readerFactory.createRecordReader(attributes, in, inputLength, logger) {code}
> Error:
> {code:java}
> java.io.IOException: Resetting to invalid mark {code}
> A basic JsonTreeReader and RecordSetWriter were utilized:
> !image-2025-03-13-18-06-00-557.png|width=314,height=210!
> !image-2025-03-13-18-06-37-150.png|width=319,height=213!
> !image-2025-03-13-18-07-36-620.png|width=325,height=215!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)