This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9fbe6aab74 NIFI-13630 Handle Map Avro Type in PutBigQuery
9fbe6aab74 is described below
commit 9fbe6aab74f133f367c6cd386b926643d7a88828
Author: Juldrixx <[email protected]>
AuthorDate: Tue Aug 6 15:49:37 2024 +0200
NIFI-13630 Handle Map Avro Type in PutBigQuery
This closes #9151
Signed-off-by: David Handermann <[email protected]>
---
.../processors/gcp/bigquery/proto/ProtoUtils.java | 16 ++++-
.../processors/gcp/bigquery/PutBigQueryTest.java | 82 ++++++++++++++++++++++
2 files changed, 96 insertions(+), 2 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
index 823e0e11d0..8dfda739e0 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/proto/ProtoUtils.java
@@ -46,8 +46,20 @@ public class ProtoUtils {
switch (field.getType()) {
case MESSAGE:
if (field.isRepeated()) {
- Collection collection = value.getClass().isArray() ?
Arrays.asList((Object[]) value) : (Collection) value;
- collection.forEach(act -> builder.addRepeatedField(field,
createMessage(field.getMessageType(), (Map<String, Object>) act, tableSchema)));
+ final Collection<Map<String, Object>> valueMaps;
+ if (value instanceof Object[] arrayValue) {
+ valueMaps = Arrays.stream(arrayValue)
+ .map(item -> (Map<String, Object>)
item).toList();
+ } else if (value instanceof Map<?, ?> mapValue) {
+ valueMaps = mapValue.entrySet().stream()
+ .map(entry -> Map.of(
+ "key", entry.getKey(),
+ "value", entry.getValue()
+ )).toList();
+ } else {
+ valueMaps = (Collection<Map<String, Object>>) value;
+ }
+ valueMaps.forEach(act -> builder.addRepeatedField(field,
createMessage(field.getMessageType(), act, tableSchema)));
} else {
builder.setField(field,
createMessage(field.getMessageType(), (Map<String, Object>) value,
tableSchema));
}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java
index 3ff7edf68c..6e6c513801 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/test/java/org/apache/nifi/processors/gcp/bigquery/PutBigQueryTest.java
@@ -44,6 +44,7 @@ import java.util.stream.Stream;
import org.apache.nifi.csv.CSVReader;
import org.apache.nifi.csv.CSVUtils;
import org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
+import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Processor;
import
org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService;
@@ -457,6 +458,26 @@ public class PutBigQueryTest {
runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 1);
}
+ @Test
+ void testMapFieldSchema() throws Exception {
+
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
+
+ TableSchema myTableSchema = mockJsonTableSchema();
+
+ when(writeStream.getTableSchema()).thenReturn(myTableSchema);
+
+ when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
+
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
+
+ decorateWithJsonRecordReaderWithSchema(runner);
+ runner.setProperty(PutBigQuery.RECORD_READER, "jsonReader");
+
+ runner.enqueue(jsonContent());
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(PutBigQuery.REL_SUCCESS);
+ }
+
private void decorateWithRecordReader(TestRunner runner) throws
InitializationException {
CSVReader csvReader = new CSVReader();
runner.addControllerService("csvReader", csvReader);
@@ -484,6 +505,30 @@ public class PutBigQueryTest {
runner.enableControllerService(csvReader);
}
+ private void decorateWithJsonRecordReaderWithSchema(TestRunner runner)
throws InitializationException {
+ String recordReaderSchema = """
+ {
+ "name": "recordFormatName",
+ "namespace": "nifi.examples",
+ "type": "record",
+ "fields": [
+ {
+ "name": "field",
+ "type": {
+ "type": "map",
+ "values": "string"
+ }
+ }
+ ]
+ }""";
+
+ JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("jsonReader", jsonReader);
+ runner.setProperty(jsonReader,
SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY,
SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT,
recordReaderSchema);
+ runner.enableControllerService(jsonReader);
+ }
+
private TableSchema mockTableSchema(String name1, TableFieldSchema.Type
type1, String name2, TableFieldSchema.Type type2) {
TableSchema myTableSchema = mock(TableSchema.class);
@@ -503,6 +548,30 @@ public class PutBigQueryTest {
return myTableSchema;
}
+ private TableSchema mockJsonTableSchema() {
+ TableSchema myTableSchema = mock(TableSchema.class);
+
+ TableFieldSchema keyFieldSchema = mock(TableFieldSchema.class);
+ when(keyFieldSchema.getName()).thenReturn("key");
+
when(keyFieldSchema.getType()).thenReturn(TableFieldSchema.Type.STRING);
+
when(keyFieldSchema.getMode()).thenReturn(TableFieldSchema.Mode.REQUIRED);
+
+ TableFieldSchema valueFieldSchema = mock(TableFieldSchema.class);
+ when(valueFieldSchema.getName()).thenReturn("value");
+
when(valueFieldSchema.getType()).thenReturn(TableFieldSchema.Type.STRING);
+
when(valueFieldSchema.getMode()).thenReturn(TableFieldSchema.Mode.NULLABLE);
+
+ TableFieldSchema tableFieldSchemaId = mock(TableFieldSchema.class);
+ when(tableFieldSchemaId.getName()).thenReturn("field");
+
when(tableFieldSchemaId.getType()).thenReturn(TableFieldSchema.Type.STRUCT);
+
when(tableFieldSchemaId.getMode()).thenReturn(TableFieldSchema.Mode.REPEATED);
+
when(tableFieldSchemaId.getFieldsList()).thenReturn(List.of(keyFieldSchema,
valueFieldSchema));
+
+
when(myTableSchema.getFieldsList()).thenReturn(List.of(tableFieldSchemaId));
+
+ return myTableSchema;
+ }
+
private String csvContentWithLines(int lineNum) {
StringBuilder builder = new StringBuilder();
builder.append(CSV_HEADER);
@@ -516,4 +585,17 @@ public class PutBigQueryTest {
return builder.toString();
}
+
+ private String jsonContent() {
+ return """
+ {
+ "field": {
+ "FIELD_1": "field_1",
+ "FIELD_2": "field_2",
+ "FIELD_3": "field_3",
+ "FIELD_4": "field_4",
+ "FIELD_5": "field_5"
+ }
+ }""";
+ }
}