tpalfy commented on code in PR #6444:
URL: https://github.com/apache/nifi/pull/6444#discussion_r980339020
##########
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -76,26 +81,60 @@ private AbstractJsonRowRecordReader(final ComponentLog
logger, final String date
LAZY_TIMESTAMP_FORMAT = () -> tsf;
}
- protected AbstractJsonRowRecordReader(final InputStream in, final
ComponentLog logger, final String dateFormat, final String timeFormat, final
String timestampFormat)
+ protected AbstractJsonRowRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat)
throws IOException, MalformedRecordException {
- this(in, logger, dateFormat, timeFormat, timestampFormat, null, null);
+ this(in, logger, dateFormat, timeFormat, timestampFormat, null, null,
null);
}
- protected AbstractJsonRowRecordReader(final InputStream in, final
ComponentLog logger, final String dateFormat, final String timeFormat, final
String timestampFormat,
- final StartingFieldStrategy
strategy, final String nestedFieldName) throws IOException,
MalformedRecordException {
+ /**
+ * Constructor with initial logic for JSON to NiFi record parsing.
+ *
+ * @param in the input stream to parse
+ * @param logger ComponentLog
+ * @param dateFormat format for parsing date fields
+ * @param timeFormat format for parsing time fields
+ * @param timestampFormat format for parsing timestamp fields
+ * @param strategy whether to start processing from a
specific field
+ * @param nestedFieldName the name of the field to start the
processing from
+ * @param captureFieldPredicate predicate that takes a JSON fieldName and
fieldValue to capture top-level non-processed fields which can
+ * be accessed by calling {@link
#getCapturedFields()}
+ * @throws IOException in case of JSON stream processing
failure
+ * @throws MalformedRecordException in case of malformed JSON input
+ */
+ protected AbstractJsonRowRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat,
+ final StartingFieldStrategy strategy,
+ final String nestedFieldName,
+ final BiPredicate<String, String>
captureFieldPredicate)
+ throws IOException, MalformedRecordException {
this(logger, dateFormat, timeFormat, timestampFormat);
this.strategy = strategy;
+ this.captureFieldPredicate = captureFieldPredicate;
+ capturedFields = new HashMap<>();
try {
jsonParser = jsonFactory.createParser(in);
jsonParser.setCodec(codec);
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
- final SerializedString serializedStartingFieldName = new
SerializedString(nestedFieldName);
- while (!jsonParser.nextFieldName(serializedStartingFieldName)
&& jsonParser.hasCurrentToken());
+ while (jsonParser.nextToken() != null) {
+ if (nestedFieldName.equals(jsonParser.getCurrentName())) {
+ break;
+ }
Review Comment:
```suggestion
if (nestedFieldName.equals(jsonParser.getCurrentName()))
{
logger.debug("Parsing starting at nested field
[{}]", nestedFieldName);
break;
}
```
##########
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -76,26 +81,60 @@ private AbstractJsonRowRecordReader(final ComponentLog
logger, final String date
LAZY_TIMESTAMP_FORMAT = () -> tsf;
}
- protected AbstractJsonRowRecordReader(final InputStream in, final
ComponentLog logger, final String dateFormat, final String timeFormat, final
String timestampFormat)
+ protected AbstractJsonRowRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat)
throws IOException, MalformedRecordException {
- this(in, logger, dateFormat, timeFormat, timestampFormat, null, null);
+ this(in, logger, dateFormat, timeFormat, timestampFormat, null, null,
null);
}
- protected AbstractJsonRowRecordReader(final InputStream in, final
ComponentLog logger, final String dateFormat, final String timeFormat, final
String timestampFormat,
- final StartingFieldStrategy
strategy, final String nestedFieldName) throws IOException,
MalformedRecordException {
+ /**
+ * Constructor with initial logic for JSON to NiFi record parsing.
+ *
+ * @param in the input stream to parse
+ * @param logger ComponentLog
+ * @param dateFormat format for parsing date fields
+ * @param timeFormat format for parsing time fields
+ * @param timestampFormat format for parsing timestamp fields
+ * @param strategy whether to start processing from a
specific field
+ * @param nestedFieldName the name of the field to start the
processing from
+ * @param captureFieldPredicate predicate that takes a JSON fieldName and
fieldValue to capture top-level non-processed fields which can
+ * be accessed by calling {@link
#getCapturedFields()}
+ * @throws IOException in case of JSON stream processing
failure
+ * @throws MalformedRecordException in case of malformed JSON input
+ */
+ protected AbstractJsonRowRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat,
+ final StartingFieldStrategy strategy,
+ final String nestedFieldName,
+ final BiPredicate<String, String>
captureFieldPredicate)
+ throws IOException, MalformedRecordException {
this(logger, dateFormat, timeFormat, timestampFormat);
this.strategy = strategy;
+ this.captureFieldPredicate = captureFieldPredicate;
+ capturedFields = new HashMap<>();
try {
jsonParser = jsonFactory.createParser(in);
jsonParser.setCodec(codec);
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
- final SerializedString serializedStartingFieldName = new
SerializedString(nestedFieldName);
- while (!jsonParser.nextFieldName(serializedStartingFieldName)
&& jsonParser.hasCurrentToken());
+ while (jsonParser.nextToken() != null) {
+ if (nestedFieldName.equals(jsonParser.getCurrentName())) {
+ break;
+ }
+ if (captureFieldPredicate != null) {
+ captureCurrentField(captureFieldPredicate);
+ }
+ }
logger.debug("Parsing starting at nested field [{}]",
nestedFieldName);
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]