tpalfy commented on code in PR #6444:
URL: https://github.com/apache/nifi/pull/6444#discussion_r980339020


##########
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -76,26 +81,60 @@ private AbstractJsonRowRecordReader(final ComponentLog 
logger, final String date
         LAZY_TIMESTAMP_FORMAT = () -> tsf;
     }
 
-    protected AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger, final String dateFormat, final String timeFormat, final 
String timestampFormat)
+    protected AbstractJsonRowRecordReader(final InputStream in,
+                                          final ComponentLog logger,
+                                          final String dateFormat,
+                                          final String timeFormat,
+                                          final String timestampFormat)
             throws IOException, MalformedRecordException {
 
-        this(in, logger, dateFormat, timeFormat, timestampFormat, null, null);
+        this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, 
null);
     }
 
-    protected AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger, final String dateFormat, final String timeFormat, final 
String timestampFormat,
-                                          final StartingFieldStrategy 
strategy, final String nestedFieldName) throws IOException, 
MalformedRecordException {
+    /**
+     * Constructor with initial logic for JSON to NiFi record parsing.
+     *
+     * @param in                     the input stream to parse
+     * @param logger                 ComponentLog
+     * @param dateFormat             format for parsing date fields
+     * @param timeFormat             format for parsing time fields
+     * @param timestampFormat        format for parsing timestamp fields
+     * @param strategy               whether to start processing from a 
specific field
+     * @param nestedFieldName        the name of the field to start the 
processing from
+     * @param captureFieldPredicate predicate that takes a JSON fieldName and 
fieldValue to capture top-level non-processed fields which can
+     *                               be accessed by calling {@link 
#getCapturedFields()}
+     * @throws IOException              in case of JSON stream processing 
failure
+     * @throws MalformedRecordException in case of malformed JSON input
+     */
+    protected AbstractJsonRowRecordReader(final InputStream in,
+                                          final ComponentLog logger,
+                                          final String dateFormat,
+                                          final String timeFormat,
+                                          final String timestampFormat,
+                                          final StartingFieldStrategy strategy,
+                                          final String nestedFieldName,
+                                          final BiPredicate<String, String> 
captureFieldPredicate)
+            throws IOException, MalformedRecordException {
 
         this(logger, dateFormat, timeFormat, timestampFormat);
 
         this.strategy = strategy;
+        this.captureFieldPredicate = captureFieldPredicate;
+        capturedFields = new HashMap<>();
 
         try {
             jsonParser = jsonFactory.createParser(in);
             jsonParser.setCodec(codec);
 
             if (strategy == StartingFieldStrategy.NESTED_FIELD) {
-                final SerializedString serializedStartingFieldName = new 
SerializedString(nestedFieldName);
-                while (!jsonParser.nextFieldName(serializedStartingFieldName) 
&& jsonParser.hasCurrentToken());
+                while (jsonParser.nextToken() != null) {
+                    if (nestedFieldName.equals(jsonParser.getCurrentName())) {
+                        break;
+                    }

Review Comment:
   ```suggestion
                       if (nestedFieldName.equals(jsonParser.getCurrentName())) 
{
                           logger.debug("Parsing starting at nested field 
[{}]", nestedFieldName);
                           break;
                       }
   ```



##########
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java:
##########
@@ -76,26 +81,60 @@ private AbstractJsonRowRecordReader(final ComponentLog 
logger, final String date
         LAZY_TIMESTAMP_FORMAT = () -> tsf;
     }
 
-    protected AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger, final String dateFormat, final String timeFormat, final 
String timestampFormat)
+    protected AbstractJsonRowRecordReader(final InputStream in,
+                                          final ComponentLog logger,
+                                          final String dateFormat,
+                                          final String timeFormat,
+                                          final String timestampFormat)
             throws IOException, MalformedRecordException {
 
-        this(in, logger, dateFormat, timeFormat, timestampFormat, null, null);
+        this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, 
null);
     }
 
-    protected AbstractJsonRowRecordReader(final InputStream in, final 
ComponentLog logger, final String dateFormat, final String timeFormat, final 
String timestampFormat,
-                                          final StartingFieldStrategy 
strategy, final String nestedFieldName) throws IOException, 
MalformedRecordException {
+    /**
+     * Constructor with initial logic for JSON to NiFi record parsing.
+     *
+     * @param in                     the input stream to parse
+     * @param logger                 ComponentLog
+     * @param dateFormat             format for parsing date fields
+     * @param timeFormat             format for parsing time fields
+     * @param timestampFormat        format for parsing timestamp fields
+     * @param strategy               whether to start processing from a 
specific field
+     * @param nestedFieldName        the name of the field to start the 
processing from
+     * @param captureFieldPredicate predicate that takes a JSON fieldName and 
fieldValue to capture top-level non-processed fields which can
+     *                               be accessed by calling {@link 
#getCapturedFields()}
+     * @throws IOException              in case of JSON stream processing 
failure
+     * @throws MalformedRecordException in case of malformed JSON input
+     */
+    protected AbstractJsonRowRecordReader(final InputStream in,
+                                          final ComponentLog logger,
+                                          final String dateFormat,
+                                          final String timeFormat,
+                                          final String timestampFormat,
+                                          final StartingFieldStrategy strategy,
+                                          final String nestedFieldName,
+                                          final BiPredicate<String, String> 
captureFieldPredicate)
+            throws IOException, MalformedRecordException {
 
         this(logger, dateFormat, timeFormat, timestampFormat);
 
         this.strategy = strategy;
+        this.captureFieldPredicate = captureFieldPredicate;
+        capturedFields = new HashMap<>();
 
         try {
             jsonParser = jsonFactory.createParser(in);
             jsonParser.setCodec(codec);
 
             if (strategy == StartingFieldStrategy.NESTED_FIELD) {
-                final SerializedString serializedStartingFieldName = new 
SerializedString(nestedFieldName);
-                while (!jsonParser.nextFieldName(serializedStartingFieldName) 
&& jsonParser.hasCurrentToken());
+                while (jsonParser.nextToken() != null) {
+                    if (nestedFieldName.equals(jsonParser.getCurrentName())) {
+                        break;
+                    }
+                    if (captureFieldPredicate != null) {
+                        captureCurrentField(captureFieldPredicate);
+                    }
+                }
                 logger.debug("Parsing starting at nested field [{}]", 
nestedFieldName);

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to