[ 
https://issues.apache.org/jira/browse/NIFI-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651403#comment-15651403
 ] 

ASF GitHub Bot commented on NIFI-969:
-------------------------------------

Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1179#discussion_r87228142
  
    --- Diff: 
nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
 ---
    @@ -136,81 +167,54 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
                 return;
             }
     
    -        final String containerOption = 
context.getProperty(CONTAINER_OPTIONS).getValue();
    -        final boolean useContainer = 
containerOption.equals(CONTAINER_ARRAY);
    -        // Wrap a single record (inclusive of no records) only when a 
container is being used
    -        final boolean wrapSingleRecord = 
context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer;
    -
    -        final String stringSchema = context.getProperty(SCHEMA).getValue();
    -        final boolean schemaLess = stringSchema != null;
    -
             try {
                 flowFile = session.write(flowFile, new StreamCallback() {
                     @Override
                     public void process(final InputStream rawIn, final 
OutputStream rawOut) throws IOException {
                         final GenericData genericData = GenericData.get();
     
    -                    if (schemaLess) {
    -                        if (schema == null) {
    -                            schema = new 
Schema.Parser().parse(stringSchema);
    -                        }
    -                        try (final InputStream in = new 
BufferedInputStream(rawIn);
    -                             final OutputStream out = new 
BufferedOutputStream(rawOut)) {
    -                            final DatumReader<GenericRecord> reader = new 
GenericDatumReader<GenericRecord>(schema);
    -                            final BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(in, null);
    -                            final GenericRecord record = reader.read(null, 
decoder);
    -
    -                            // Schemaless records are singletons, so both 
useContainer and wrapSingleRecord
    -                            // need to be true before we wrap it with an 
array
    +                    try (OutputStream out = new 
BufferedOutputStream(rawOut); InputStream in = new BufferedInputStream(rawIn)) {
    +                        DatumReader<GenericRecord> reader = new 
GenericDatumReader<GenericRecord>(schema);
    +                        if (schema != null) {
    +                            BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(in, null);
    +                            GenericRecord currRecord = reader.read(null, 
decoder);
                                 if (useContainer && wrapSingleRecord) {
                                     out.write('[');
                                 }
    -
    -                            final byte[] outputBytes = (record == null) ? 
EMPTY_JSON_OBJECT : 
genericData.toString(record).getBytes(StandardCharsets.UTF_8);
    +                            byte[] outputBytes = (currRecord == null) ? 
EMPTY_JSON_OBJECT
    +                                    : (useAvroJson ? toAvroJSON(schema, 
currRecord) : 
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
                                 out.write(outputBytes);
    -
                                 if (useContainer && wrapSingleRecord) {
                                     out.write(']');
                                 }
    -                        }
    -                    } else {
    -                        try (final InputStream in = new 
BufferedInputStream(rawIn);
    -                             final OutputStream out = new 
BufferedOutputStream(rawOut);
    -                             final DataFileStream<GenericRecord> reader = 
new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
    -
    -                            int recordCount = 0;
    -                            GenericRecord currRecord = null;
    -                            if (reader.hasNext()) {
    -                                currRecord = reader.next();
    -                                recordCount++;
    -                            }
    -
    -                            // Open container if desired output is an 
array format and there are are multiple records or
    -                            // if configured to wrap single record
    -                            if (reader.hasNext() && useContainer || 
wrapSingleRecord) {
    -                                out.write('[');
    -                            }
    -
    -                            // Determine the initial output record, 
inclusive if we should have an empty set of Avro records
    -                            final byte[] outputBytes = (currRecord == 
null) ? EMPTY_JSON_OBJECT : 
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8);
    -                            out.write(outputBytes);
    -
    -                            while (reader.hasNext()) {
    -                                if (useContainer) {
    -                                    out.write(',');
    -                                } else {
    -                                    out.write('\n');
    +                        } else {
    +                            try (DataFileStream<GenericRecord> stream = 
new DataFileStream<>(in, reader)) {
    +                                int recordCount = 0;
    +                                GenericRecord currRecord = null;
    +                                if (stream.hasNext()) {
    +                                    currRecord = stream.next();
    +                                    recordCount++;
    +                                }
    +                                if (stream.hasNext() && useContainer || 
wrapSingleRecord) {
    +                                    out.write('[');
    +                                }
    +                                byte[] outputBytes = (currRecord == null) 
? EMPTY_JSON_OBJECT
    +                                        : (useAvroJson ? 
toAvroJSON(stream.getSchema(), currRecord) : 
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
    --- End diff --
    
    Looks like toString() is still being used for encoding here, Ryan 
recommended a different encoder for normal JSON. Not sure what that entails 
though, @rdblue can you elaborate on the approach? At first glance it looks 
like you might need to subclass 
[JsonEncoder](https://github.com/apache/avro/blob/release-1.7.7/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java)
 to handle unions differently.


> Add option to use standard JSON or Avro-JSON to ConvertAvroToJSON
> -----------------------------------------------------------------
>
>                 Key: NIFI-969
>                 URL: https://issues.apache.org/jira/browse/NIFI-969
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>    Affects Versions: 0.3.0
>            Reporter: Ryan Blue
>            Assignee: Oleg Zhurakousky
>
> ConvertAvroToJSON uses {{GenericData#toString(GenericRecord)}} to convert to 
> JSON. This produces 
> [Avro-JSON|https://avro.apache.org/docs/1.7.7/spec.html#json_encoding], which 
> probably isn't what most users want because it adds an unexpected layer for 
> Avro union types:
> bq. the union schema {{["null","string",...]}} ... would encode: the string 
> {{"a"}} as {{{"string": "a"}}} ...
> It would be good to have a conversion that doesn't add those layers as an 
> option.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to