[ 
https://issues.apache.org/jira/browse/NIFI-1280?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15962813#comment-15962813
 ] 

ASF GitHub Bot commented on NIFI-1280:
--------------------------------------

Github user markap14 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1652#discussion_r110647534
  
    --- Diff: 
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
 ---
    @@ -0,0 +1,158 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.nifi.json;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.util.HashMap;
    +import java.util.LinkedHashMap;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Optional;
    +
    +import org.apache.nifi.logging.ComponentLog;
    +import org.apache.nifi.serialization.MalformedRecordException;
    +import org.apache.nifi.serialization.record.DataType;
    +import org.apache.nifi.serialization.record.MapRecord;
    +import org.apache.nifi.serialization.record.Record;
    +import org.apache.nifi.serialization.record.RecordFieldType;
    +import org.apache.nifi.serialization.record.RecordSchema;
    +import org.apache.nifi.serialization.record.type.ArrayDataType;
    +import org.apache.nifi.serialization.record.type.RecordDataType;
    +import org.apache.nifi.serialization.record.util.DataTypeUtils;
    +import 
org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
    +import org.codehaus.jackson.JsonNode;
    +
    +import com.jayway.jsonpath.Configuration;
    +import com.jayway.jsonpath.DocumentContext;
    +import com.jayway.jsonpath.JsonPath;
    +import com.jayway.jsonpath.PathNotFoundException;
    +import com.jayway.jsonpath.spi.json.JacksonJsonProvider;
    +
    +public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
    +    private static final Configuration STRICT_PROVIDER_CONFIGURATION = 
Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
    +
    +    private final LinkedHashMap<String, JsonPath> jsonPaths;
    +    private final InputStream in;
    +    private RecordSchema schema;
    +    private final String dateFormat;
    +    private final String timeFormat;
    +    private final String timestampFormat;
    +
    +    public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> 
jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog 
logger,
    +        final String dateFormat, final String timeFormat, final String 
timestampFormat)
    +        throws MalformedRecordException, IOException {
    +        super(in, logger);
    +
    +        this.dateFormat = dateFormat;
    +        this.timeFormat = timeFormat;
    +        this.timestampFormat = timestampFormat;
    +
    +        this.schema = schema;
    +        this.jsonPaths = jsonPaths;
    +        this.in = in;
    +    }
    +
    +    @Override
    +    public void close() throws IOException {
    +        in.close();
    +    }
    +
    +    @Override
    +    public RecordSchema getSchema() {
    +        return schema;
    +    }
    +
    +    @Override
    +    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, 
final RecordSchema schema) throws IOException {
    +        if (jsonNode == null) {
    +            return null;
    +        }
    +
    +        final DocumentContext ctx = 
JsonPath.using(STRICT_PROVIDER_CONFIGURATION).parse(jsonNode.toString());
    +        final Map<String, Object> values = new 
HashMap<>(schema.getFieldCount());
    +
    +        for (final Map.Entry<String, JsonPath> entry : 
jsonPaths.entrySet()) {
    +            final String fieldName = entry.getKey();
    +            final DataType desiredType = 
schema.getDataType(fieldName).orElse(null);
    +            if (desiredType == null) {
    +                continue;
    +            }
    +
    +            final JsonPath jsonPath = entry.getValue();
    +
    +            Object value;
    +            try {
    +                value = ctx.read(jsonPath);
    +            } catch (final PathNotFoundException pnfe) {
    +                value = null;
    +            }
    +
    +            value = convert(value, desiredType);
    --- End diff --
    
    Will add log message; calling the convert() method with null value will 
simply return null, so all should be good.


> Create QueryFlowFile Processor
> ------------------------------
>
>                 Key: NIFI-1280
>                 URL: https://issues.apache.org/jira/browse/NIFI-1280
>             Project: Apache NiFi
>          Issue Type: Task
>          Components: Extensions
>            Reporter: Mark Payne
>            Assignee: Mark Payne
>             Fix For: 1.2.0
>
>         Attachments: QueryFlowFile_Record_Reader-Writer_Examples.xml
>
>
> We should have a Processor that allows users to easily filter out specific 
> columns from CSV data. For instance, a user would configure two different 
> properties: "Columns of Interest" (a comma-separated list of column indexes) 
> and "Filtering Strategy" (Keep Only These Columns, Remove Only These Columns).
> We can do this today with ReplaceText, but it is far more difficult than it 
> would be with this Processor, as the user has to use Regular Expressions, 
> etc. with ReplaceText.
> Eventually a Custom UI could even be built that allows a user to upload a 
> Sample CSV and choose which columns from there, similar to the way that Excel 
> works when importing CSV by dragging and selecting the desired columns? That 
> would certainly be a larger undertaking and would not need to be done for an 
> initial implementation.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to