Lehel44 commented on a change in pull request #5038:
URL: https://github.com/apache/nifi/pull/5038#discussion_r644112482
##########
File path:
nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/UpdateRecord.java
##########
@@ -149,6 +152,241 @@ public void createRecordPaths(final ProcessContext
context) {
this.recordPaths = recordPaths;
}
+ private static class RecordWrapper {
+ private Record originalRecord;
+ private Record currentRecord;
+ private Path path;
+ private Map<RecordField, FieldValue> actualFields = new HashMap<>();
// The record might have fewer fields than its schema
+
+ public RecordWrapper(Record record, String path) {
+ this.originalRecord = record;
+ this.path = new Path(path);
+
+ if (this.path.isEmpty()) {
+ this.currentRecord = originalRecord;
+ } else {
+ this.currentRecord = getRecordByPath(this.path.toString() +
"/*");
+ }
+
+ if (this.currentRecord != null) {
+ List<FieldValue> listOfActualFields =
getSelectedFields(this.path + "/*");
+ listOfActualFields.forEach(field ->
actualFields.put(field.getField(), field));
+ }
+ }
+
+ public boolean recordExists() {
+ return currentRecord != null;
+ }
+
+ public Record getRecord() {
+ return currentRecord;
+ }
+
+ public boolean actuallyContainsField(RecordField field) {
+ return actualFields.containsKey(field);
+ }
+
+ public FieldValue getActualFieldValue(RecordField field) {
+ return actualFields.get(field);
+ }
+
+ public String getPath() {
+ return path.toString();
+ }
+
+ public Record getOriginalRecord() {
+ return originalRecord;
+ }
+
+ private Record getRecordByPath(String recordPath) {
+ Record resultRecord;
+ Optional<FieldValue> fieldValueOption =
RecordPath.compile(recordPath).evaluate(originalRecord).getSelectedFields().findAny();
+ if (fieldValueOption.isPresent()) {
+ resultRecord = getParentRecordOfField(fieldValueOption.get());
+ } else {
+ Path newPath = new Path(recordPath); // Cutting the "/*" from
the end.
+ if (newPath.isEmpty()) {
+ resultRecord = originalRecord;
+ } else {
+ resultRecord = getTargetAsRecord(newPath.toString());
+ }
+ }
+ return resultRecord;
+ }
+
+ private Record getParentRecordOfField(FieldValue fieldValue) {
+ Optional<Record> parentRecordOption = fieldValue.getParentRecord();
+ if (parentRecordOption.isPresent()) {
+ return parentRecordOption.get();
+ } else {
+ return originalRecord;
+ }
+ }
+
+ private Record getTargetAsRecord(String path) {
+ Optional<FieldValue> targetFieldOption =
RecordPath.compile(path).evaluate(originalRecord).getSelectedFields().findAny();
+ //TODO: is it possible that there are two elements in a record
with the same path ?
+ if (targetFieldOption.isPresent()) {
+ FieldValue targetField = targetFieldOption.get();
+ return getFieldValueAsRecord(targetField);
+ } else {
+ return null;
+ }
+ }
+
+ // Returns null if fieldValue is not of type Record
+ private Record getFieldValueAsRecord(FieldValue fieldValue) {
+ Object targetObject = fieldValue.getValue();
+ if (targetObject instanceof Record) {
+ return ((Record) targetObject);
+ } else {
+ return null;
+ }
+ }
+
+ private List<FieldValue> getSelectedFields(String path) {
+ RecordPath recordPath = RecordPath.compile(path);
+ RecordPathResult recordPathResult =
recordPath.evaluate(originalRecord);
+ return
recordPathResult.getSelectedFields().collect(Collectors.toList());
+ }
+ }
+
+ private static class Path {
+ private final String path;
+
+ public Path(String path) {
+ if (path.length() == 0 || path.equals("/") || path.equals("/*")) {
+ this.path = "";
+ } else if (path.endsWith("/")) {
+ this.path = path.substring(0, path.length() - 1);
+ } else if (path.endsWith("/*")) {
+ this.path = path.substring(0, path.length() - 2);
+ } else {
+ this.path = path;
+ }
+ }
+
+ public boolean isEmpty() {
+ return "".equals(path);
+ }
+
+ @Override
+ public String toString() {
+ return path;
+ }
+ }
+
+
+ private Record remove(Record record, List<FieldValue> fieldsToRemove) {
+ if (fieldsToRemove.isEmpty()) {
+ return record;
+ } else {
+ RecordWrapper wrappedRecord = new RecordWrapper(record, "/");
+ List<String> fieldsToRemoveWithPath =
mapFieldValueToPath(fieldsToRemove);
+ return remove(wrappedRecord, fieldsToRemove,
fieldsToRemoveWithPath);
+ }
+ }
+
+ private List<String> mapFieldValueToPath(List<FieldValue> fieldValues) {
+ List<String> paths = new ArrayList<>();
+ fieldValues.forEach(field -> paths.add(getAbsolutePath(field)));
+ return paths;
+ }
+
+ private String getAbsolutePath(FieldValue fieldValue) {
+ if (!fieldValue.getParent().isPresent()) {
+ return "";
+ }
+ if
("root".equals(fieldValue.getParent().get().getField().getFieldName())) {
+ return "/" + fieldValue.getField().getFieldName();
+ }
+ return getAbsolutePath(fieldValue.getParent().get()) + "/" +
fieldValue.getField().getFieldName();
+ }
+
+ private Record remove(RecordWrapper wrappedRecord, List<FieldValue>
fieldsToRemove, List<String> fieldsToRemoveWithPath) {
+ List<RecordField> newSchemaFieldList = new ArrayList<>();
+ Map<String, Object> newRecordMap = new LinkedHashMap<>();
+
+ for (RecordField schemaField :
wrappedRecord.getRecord().getSchema().getFields()) { // Iterate the fields of
the record's schema
+ if (wrappedRecord.actuallyContainsField(schemaField)) { // The
record actually has data or null for that field
+ handleFieldInSchemaAndData(schemaField, wrappedRecord,
fieldsToRemove, fieldsToRemoveWithPath, newSchemaFieldList, newRecordMap);
+ } else { // The record does not even contain the field (no data
for it). We still need to check if it needs to be deleted from the schema.
+ handleFieldInSchema(schemaField, wrappedRecord,
fieldsToRemove, newSchemaFieldList, fieldsToRemoveWithPath);
+ }
+ }
+
+ RecordSchema newSchema = new SimpleRecordSchema(newSchemaFieldList);
+ Record newRecord = new MapRecord(newSchema, newRecordMap);
Review comment:
I think you can return here with the method call.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]