[
https://issues.apache.org/jira/browse/NIFI-969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15651403#comment-15651403
]
ASF GitHub Bot commented on NIFI-969:
-------------------------------------
Github user mattyb149 commented on a diff in the pull request:
https://github.com/apache/nifi/pull/1179#discussion_r87228142
--- Diff:
nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
---
@@ -136,81 +167,54 @@ public void onTrigger(ProcessContext context,
ProcessSession session) throws Pro
return;
}
- final String containerOption =
context.getProperty(CONTAINER_OPTIONS).getValue();
- final boolean useContainer =
containerOption.equals(CONTAINER_ARRAY);
- // Wrap a single record (inclusive of no records) only when a
container is being used
- final boolean wrapSingleRecord =
context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer;
-
- final String stringSchema = context.getProperty(SCHEMA).getValue();
- final boolean schemaLess = stringSchema != null;
-
try {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream rawIn, final
OutputStream rawOut) throws IOException {
final GenericData genericData = GenericData.get();
- if (schemaLess) {
- if (schema == null) {
- schema = new
Schema.Parser().parse(stringSchema);
- }
- try (final InputStream in = new
BufferedInputStream(rawIn);
- final OutputStream out = new
BufferedOutputStream(rawOut)) {
- final DatumReader<GenericRecord> reader = new
GenericDatumReader<GenericRecord>(schema);
- final BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(in, null);
- final GenericRecord record = reader.read(null,
decoder);
-
- // Schemaless records are singletons, so both
useContainer and wrapSingleRecord
- // need to be true before we wrap it with an
array
+ try (OutputStream out = new
BufferedOutputStream(rawOut); InputStream in = new BufferedInputStream(rawIn)) {
+ DatumReader<GenericRecord> reader = new
GenericDatumReader<GenericRecord>(schema);
+ if (schema != null) {
+ BinaryDecoder decoder =
DecoderFactory.get().binaryDecoder(in, null);
+ GenericRecord currRecord = reader.read(null,
decoder);
if (useContainer && wrapSingleRecord) {
out.write('[');
}
-
- final byte[] outputBytes = (record == null) ?
EMPTY_JSON_OBJECT :
genericData.toString(record).getBytes(StandardCharsets.UTF_8);
+ byte[] outputBytes = (currRecord == null) ?
EMPTY_JSON_OBJECT
+ : (useAvroJson ? toAvroJSON(schema,
currRecord) :
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
out.write(outputBytes);
-
if (useContainer && wrapSingleRecord) {
out.write(']');
}
- }
- } else {
- try (final InputStream in = new
BufferedInputStream(rawIn);
- final OutputStream out = new
BufferedOutputStream(rawOut);
- final DataFileStream<GenericRecord> reader =
new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
-
- int recordCount = 0;
- GenericRecord currRecord = null;
- if (reader.hasNext()) {
- currRecord = reader.next();
- recordCount++;
- }
-
- // Open container if desired output is an
array format and there are are multiple records or
- // if configured to wrap single record
- if (reader.hasNext() && useContainer ||
wrapSingleRecord) {
- out.write('[');
- }
-
- // Determine the initial output record,
inclusive if we should have an empty set of Avro records
- final byte[] outputBytes = (currRecord ==
null) ? EMPTY_JSON_OBJECT :
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8);
- out.write(outputBytes);
-
- while (reader.hasNext()) {
- if (useContainer) {
- out.write(',');
- } else {
- out.write('\n');
+ } else {
+ try (DataFileStream<GenericRecord> stream =
new DataFileStream<>(in, reader)) {
+ int recordCount = 0;
+ GenericRecord currRecord = null;
+ if (stream.hasNext()) {
+ currRecord = stream.next();
+ recordCount++;
+ }
+ if (stream.hasNext() && useContainer ||
wrapSingleRecord) {
+ out.write('[');
+ }
+ byte[] outputBytes = (currRecord == null)
? EMPTY_JSON_OBJECT
+ : (useAvroJson ?
toAvroJSON(stream.getSchema(), currRecord) :
genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
--- End diff --
Looks like toString() is still being used for encoding here, Ryan
recommended a different encoder for normal JSON. Not sure what that entails
though, @rdblue can you elaborate on the approach? At first glance it looks
like you might need to subclass
[JsonEncoder](https://github.com/apache/avro/blob/release-1.7.7/lang/java/avro/src/main/java/org/apache/avro/io/JsonEncoder.java)
to handle unions differently.
> Add option to use standard JSON or Avro-JSON to ConvertAvroToJSON
> -----------------------------------------------------------------
>
> Key: NIFI-969
> URL: https://issues.apache.org/jira/browse/NIFI-969
> Project: Apache NiFi
> Issue Type: Improvement
> Components: Extensions
> Affects Versions: 0.3.0
> Reporter: Ryan Blue
> Assignee: Oleg Zhurakousky
>
> ConvertAvroToJSON uses {{GenericData#toString(GenericRecord)}} to convert to
> JSON. This produces
> [Avro-JSON|https://avro.apache.org/docs/1.7.7/spec.html#json_encoding], which
> probably isn't what most users want because it adds an unexpected layer for
> Avro union types:
> bq. the union schema {{["null","string",...]}} ... would encode: the string
> {{"a"}} as {{{"string": "a"}}} ...
> It would be good to have a conversion that doesn't add those layers as an
> option.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)