Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1179#discussion_r87228142
--- Diff:
nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
---
@@ -136,81 +167,54 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
return;
}
- final String containerOption =
context.getProperty(CONTAINER_OPTIONS).getValue();
- final boolean useContainer =
containerOption.equals(CONTAINER_ARRAY);
- // Wrap a single record (inclusive of no records) only when a
container is being used
- final boolean wrapSingleRecord =
context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer;
-
- final String stringSchema = context.getProperty(SCHEMA).getValue();
- final boolean schemaLess = stringSchema != null;
-
try {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream rawIn, final
OutputStream rawOut) throws IOException {
final GenericData genericData = GenericData.get();
- if (schemaLess) {
- if (schema == null) {
- schema = new
Schema.Parser().parse(stringSchema);
- }
- try (final InputStream in = new
BufferedInputStream(rawIn);
- final OutputStream out = new
BufferedOutputStream(rawOut)) {
- final DatumReader<GenericRecord> reader = new
GenericDatumReader<GenericRecord>(schema);
- final BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(in, null);
- final GenericRecord record = reader.read(null,
decoder);
-
- // Schemaless records are singletons, so both
useContainer and wrapSingleRecord
- // need to be true before we wrap it with an
array
+ try (OutputStream out = new
BufferedOutputStream(rawOut); InputStream in = new BufferedInputStream(rawIn)) {
+ DatumReader<GenericRecord> reader = new
GenericDatumReader<GenericRecord>(schema);
+ if (schema != null) {
+ BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(in, null);
+ GenericRecord currRecord = reader.read(null,
decoder);
if (useContainer && wrapSingleRecord) {
out.write('[');
}
-
- final byte[] outputBytes = (record == null) ?
EMPTY_JSON_OBJECT :
genericData.toString(record).getBytes(StandardCharsets.UTF_8);
+ byte[] outputBytes = (currRecord == null) ?
EMPTY_JSON_OBJECT
+ : (useAvroJson ? toAvroJSON(schema,
currRecord) :
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
out.write(outputBytes);
-
if (useContainer && wrapSingleRecord) {
out.write(']');
}
- }
- } else {
- try (final InputStream in = new
BufferedInputStream(rawIn);
- final OutputStream out = new
BufferedOutputStream(rawOut);
- final DataFileStream<GenericRecord> reader =
new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
-
- int recordCount = 0;
- GenericRecord currRecord = null;
- if (reader.hasNext()) {
- currRecord = reader.next();
- recordCount++;
- }
-
- // Open container if desired output is an
array format and there are are multiple records or
- // if configured to wrap single record
- if (reader.hasNext() && useContainer ||
wrapSingleRecord) {
- out.write('[');
- }
-
- // Determine the initial output record,
inclusive if we should have an empty set of Avro records
- final byte[] outputBytes = (currRecord ==
null) ? EMPTY_JSON_OBJECT :
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8);
- out.write(outputBytes);
-
- while (reader.hasNext()) {
- if (useContainer) {
- out.write(',');
- } else {
- out.write('\n');
+ } else {
+ try (DataFileStream<GenericRecord> stream =
new DataFileStream<>(in, reader)) {
+ int recordCount = 0;
+ GenericRecord currRecord = null;
+ if (stream.hasNext()) {
+ currRecord = stream.next();
+ recordCount++;
+ }
+ if (stream.hasNext() && useContainer ||
wrapSingleRecord) {
+ out.write('[');
+ }
+ byte[] outputBytes = (currRecord == null)
? EMPTY_JSON_OBJECT
+ : (useAvroJson ?
toAvroJSON(stream.getSchema(), currRecord) :
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
--- End diff --
Looks like toString() is still being used for encoding here, Ryan
recommended a different encoder for normal JSON. Not sure what that entails
though, @rdblue can you elaborate on the approach? At first glance it looks
like you might need to subclass
[JsonEncoder](https://github.com/apache/avro/blob/release-1.7.7/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java)
to handle unions differently.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---