bejancsaba commented on code in PR #7140:
URL: https://github.com/apache/nifi/pull/7140#discussion_r1161337789


##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java:
##########
@@ -303,6 +301,7 @@ private void finishProcessing(ProcessSession session, 
FlowFile flowFile, StreamW
             flowFile = session.putAttribute(flowFile, 
BigQueryAttributes.JOB_NB_RECORDS_ATTR, isBatch() ? "0" : 
String.valueOf(appendSuccessCount.get() * recordBatchCount));
             session.penalize(flowFile);
             session.transfer(flowFile, REL_FAILURE);
+            error.set(null); // set error to null for next execution

Review Comment:
   Thanks for catching this. What do you think about adding a unit test 
covering this scenario (failing before the change and passing after it)
   ```
       @Test
       void testNextFlowFileProcessedWhenIntermittentErrorResolved() {
           
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
           TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, 
TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
           when(writeStream.getTableSchema()).thenReturn(myTableSchema);
           when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
               .thenReturn(ApiFutures.immediateFailedFuture(new 
StatusRuntimeException(Status.INTERNAL)))
               
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
   
           runner.setProperty(PutBigQuery.RETRY_COUNT, "0");
   
           runner.enqueue(csvContentWithLines(1));
           runner.enqueue(csvContentWithLines(1));
           runner.run(2);
   
           verify(streamWriter, times(2)).append(any(ProtoRows.class), 
anyLong());
   
           runner.assertQueueEmpty();
           runner.assertTransferCount(PutBigQuery.REL_FAILURE, 1);
           runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 1);
       }
   ```



##########
nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java:
##########
@@ -434,6 +433,9 @@ private static Map<String, Object> 
convertMapRecord(Map<String, Object> map) {
         Map<String, Object> result = new HashMap<>();
         for (String key : map.keySet()) {
             Object obj = map.get(key);
+            // BigQuery is not case sensitive on the column names but the 
protobuf message
+            // expect all column names to be lower case
+            key = key.toLowerCase();

Review Comment:
   This is good to know. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to