Github user mattyb149 commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/1179#discussion_r87228142
  
    --- Diff: 
nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
 ---
    @@ -136,81 +167,54 @@ public void onTrigger(ProcessContext context, 
ProcessSession session) throws Pro
                 return;
             }
     
    -        final String containerOption = 
context.getProperty(CONTAINER_OPTIONS).getValue();
    -        final boolean useContainer = 
containerOption.equals(CONTAINER_ARRAY);
    -        // Wrap a single record (inclusive of no records) only when a 
container is being used
    -        final boolean wrapSingleRecord = 
context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer;
    -
    -        final String stringSchema = context.getProperty(SCHEMA).getValue();
    -        final boolean schemaLess = stringSchema != null;
    -
             try {
                 flowFile = session.write(flowFile, new StreamCallback() {
                     @Override
                     public void process(final InputStream rawIn, final 
OutputStream rawOut) throws IOException {
                         final GenericData genericData = GenericData.get();
     
    -                    if (schemaLess) {
    -                        if (schema == null) {
    -                            schema = new 
Schema.Parser().parse(stringSchema);
    -                        }
    -                        try (final InputStream in = new 
BufferedInputStream(rawIn);
    -                             final OutputStream out = new 
BufferedOutputStream(rawOut)) {
    -                            final DatumReader<GenericRecord> reader = new 
GenericDatumReader<GenericRecord>(schema);
    -                            final BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(in, null);
    -                            final GenericRecord record = reader.read(null, 
decoder);
    -
    -                            // Schemaless records are singletons, so both 
useContainer and wrapSingleRecord
    -                            // need to be true before we wrap it with an 
array
    +                    try (OutputStream out = new 
BufferedOutputStream(rawOut); InputStream in = new BufferedInputStream(rawIn)) {
    +                        DatumReader<GenericRecord> reader = new 
GenericDatumReader<GenericRecord>(schema);
    +                        if (schema != null) {
    +                            BinaryDecoder decoder = 
DecoderFactory.get().binaryDecoder(in, null);
    +                            GenericRecord currRecord = reader.read(null, 
decoder);
                                 if (useContainer && wrapSingleRecord) {
                                     out.write('[');
                                 }
    -
    -                            final byte[] outputBytes = (record == null) ? 
EMPTY_JSON_OBJECT : 
genericData.toString(record).getBytes(StandardCharsets.UTF_8);
    +                            byte[] outputBytes = (currRecord == null) ? 
EMPTY_JSON_OBJECT
    +                                    : (useAvroJson ? toAvroJSON(schema, 
currRecord) : 
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
                                 out.write(outputBytes);
    -
                                 if (useContainer && wrapSingleRecord) {
                                     out.write(']');
                                 }
    -                        }
    -                    } else {
    -                        try (final InputStream in = new 
BufferedInputStream(rawIn);
    -                             final OutputStream out = new 
BufferedOutputStream(rawOut);
    -                             final DataFileStream<GenericRecord> reader = 
new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
    -
    -                            int recordCount = 0;
    -                            GenericRecord currRecord = null;
    -                            if (reader.hasNext()) {
    -                                currRecord = reader.next();
    -                                recordCount++;
    -                            }
    -
    -                            // Open container if desired output is an 
array format and there are are multiple records or
    -                            // if configured to wrap single record
    -                            if (reader.hasNext() && useContainer || 
wrapSingleRecord) {
    -                                out.write('[');
    -                            }
    -
    -                            // Determine the initial output record, 
inclusive if we should have an empty set of Avro records
    -                            final byte[] outputBytes = (currRecord == 
null) ? EMPTY_JSON_OBJECT : 
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8);
    -                            out.write(outputBytes);
    -
    -                            while (reader.hasNext()) {
    -                                if (useContainer) {
    -                                    out.write(',');
    -                                } else {
    -                                    out.write('\n');
    +                        } else {
    +                            try (DataFileStream<GenericRecord> stream = 
new DataFileStream<>(in, reader)) {
    +                                int recordCount = 0;
    +                                GenericRecord currRecord = null;
    +                                if (stream.hasNext()) {
    +                                    currRecord = stream.next();
    +                                    recordCount++;
    +                                }
    +                                if (stream.hasNext() && useContainer || 
wrapSingleRecord) {
    +                                    out.write('[');
    +                                }
    +                                byte[] outputBytes = (currRecord == null) 
? EMPTY_JSON_OBJECT
    +                                        : (useAvroJson ? 
toAvroJSON(stream.getSchema(), currRecord) : 
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
    --- End diff --
    
    Looks like toString() is still being used for encoding here, Ryan 
recommended a different encoder for normal JSON. Not sure what that entails 
though, @rdblue can you elaborate on the approach? At first glance it looks 
like you might need to subclass 
[JsonEncoder](https://github.com/apache/avro/blob/release-1.7.7/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java)
 to handle unions differently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

Reply via email to