[ https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962813#comment-15962813 ]
ASF GitHub Bot commented on NIFI-1280: -------------------------------------- Github user markap14 commented on a diff in the pull request: https://github.com/apache/nifi/pull/1652#discussion_r110647534 --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java --- @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.json; + +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import org.apache.nifi.logging.ComponentLog; +import org.apache.nifi.serialization.MalformedRecordException; +import org.apache.nifi.serialization.record.DataType; +import org.apache.nifi.serialization.record.MapRecord; +import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordFieldType; +import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.ArrayDataType; +import org.apache.nifi.serialization.record.type.RecordDataType; +import org.apache.nifi.serialization.record.util.DataTypeUtils; +import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; +import org.codehaus.jackson.JsonNode; + +import com.jayway.jsonpath.Configuration; +import com.jayway.jsonpath.DocumentContext; +import com.jayway.jsonpath.JsonPath; +import com.jayway.jsonpath.PathNotFoundException; +import com.jayway.jsonpath.spi.json.JacksonJsonProvider; + +public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader { + private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build(); + + private final LinkedHashMap<String, JsonPath> jsonPaths; + private final InputStream in; + private RecordSchema schema; + private final String dateFormat; + private final String timeFormat; + private final String timestampFormat; + + public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger, + final String dateFormat, final String timeFormat, final String timestampFormat) + throws MalformedRecordException, IOException { + super(in, logger); + + this.dateFormat = dateFormat; + this.timeFormat = timeFormat; + this.timestampFormat = timestampFormat; + + this.schema = schema; + this.jsonPaths = jsonPaths; + this.in = in; + } + + @Override + public void close() throws IOException { + in.close(); + } + + @Override + public RecordSchema getSchema() { + return schema; + } + + @Override + protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException { + if (jsonNode == null) { + return null; + } + + final DocumentContext ctx = JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString()); + final Map<String, Object> values = new HashMap<>(schema.getFieldCount()); + + for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) { + final String fieldName = entry.getKey(); + final DataType desiredType = schema.getDataType(fieldName).orElse(null); + if (desiredType == null) { + continue; + } + + final JsonPath jsonPath = entry.getValue(); + + Object value; + try { + value = ctx.read(jsonPath); + } catch (final PathNotFoundException pnfe) { + value = null; + } + + value = convert(value, desiredType); --- End diff -- Will add log message; calling the convert() method with null value will simply return null, so all should be good. > Create QueryFlowFile Processor > ------------------------------ > > Key: NIFI-1280 > URL: https://issues.apache.org/jira/browse/NIFI-1280 > Project: Apache NiFi > Issue Type: Task > Components: Extensions > Reporter: Mark Payne > Assignee: Mark Payne > Fix For: 1.2.0 > > Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml > > > We should have a Processor that allows users to easily filter out specific > columns from CSV data. For instance, a user would configure two different > properties: "Columns of Interest" (a comma-separated list of column indexes) > and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns). > We can do this today with ReplaceText, but it is far more difficult than it > would be with this Processor, as the user has to use Regular Expressions, > etc. with ReplaceText. > Eventually a Custom UI could even be built that allows a user to upload a > Sample CSV and choose which columns from there, similar to the way that Excel > works when importing CSV by dragging and selecting the desired columns? That > would certainly be a larger undertaking and would not need to be done for an > initial implementation. -- This message was sent by Atlassian JIRA (v6.3.15#6346)