This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 0344bd3e25 NIFI-11608 Fixed Expression Language Evaluation in 
PutBigQuery
0344bd3e25 is described below

commit 0344bd3e25c91e265d48a0410081af51ecb49092
Author: Steven Matison <[email protected]>
AuthorDate: Wed May 31 07:31:27 2023 -0400

    NIFI-11608 Fixed Expression Language Evaluation in PutBigQuery
    
    This closes #7316
    
    Signed-off-by: David Handermann <[email protected]>
    (cherry picked from commit 645618a6095a94488f845cb427a4d4768161953d)
---
 .../nifi/processors/gcp/bigquery/PutBigQuery.java  | 34 ++++++++++++----------
 1 file changed, 18 insertions(+), 16 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
index edcfcfd14b..54105d9592 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
@@ -115,13 +115,11 @@ public class PutBigQuery extends 
AbstractBigQueryProcessor {
     private final AtomicReference<Exception> error = new AtomicReference<>();
     private final AtomicInteger appendSuccessCount = new AtomicInteger(0);
     private final Phaser inflightRequestCount = new Phaser(1);
-    private TableName tableName = null;
     private BigQueryWriteClient writeClient = null;
     private StreamWriter streamWriter = null;
     private String transferType;
     private int maxRetryCount;
     private int recordBatchCount;
-    private boolean skipInvalidRows;
 
     public static final PropertyDescriptor PROJECT_ID = new 
PropertyDescriptor.Builder()
         .fromPropertyDescriptor(AbstractBigQueryProcessor.PROJECT_ID)
@@ -185,12 +183,9 @@ public class PutBigQuery extends AbstractBigQueryProcessor 
{
     @OnScheduled
     public void onScheduled(ProcessContext context) {
         super.onScheduled(context);
-
         transferType = context.getProperty(TRANSFER_TYPE).getValue();
         maxRetryCount = context.getProperty(RETRY_COUNT).asInteger();
-        skipInvalidRows = context.getProperty(SKIP_INVALID_ROWS).asBoolean();
         recordBatchCount = 
context.getProperty(APPEND_RECORD_COUNT).asInteger();
-        tableName = TableName.of(context.getProperty(PROJECT_ID).getValue(), 
context.getProperty(DATASET).getValue(), 
context.getProperty(TABLE_NAME).getValue());
         writeClient = createWriteClient(getGoogleCredentials(context));
     }
 
@@ -201,30 +196,37 @@ public class PutBigQuery extends 
AbstractBigQueryProcessor {
 
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session)  {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final String projectId = context.getProperty(PROJECT_ID).getValue();
+        final String dataset = 
context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
+        final String dataTableName = 
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+        final TableName tableName = TableName.of(projectId, dataset, 
dataTableName);
+
         WriteStream writeStream;
         Descriptors.Descriptor protoDescriptor;
         try {
-            writeStream = createWriteStream();
+            writeStream = createWriteStream(tableName);
             protoDescriptor = 
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(writeStream.getTableSchema());
             streamWriter = createStreamWriter(writeStream.getName(), 
protoDescriptor, getGoogleCredentials(context));
         } catch (Descriptors.DescriptorValidationException | IOException e) {
             getLogger().error("Failed to create Big Query Stream Writer for 
writing", e);
             context.yield();
+            session.rollback();
             return;
         }
 
-        FlowFile flowFile = session.get();
-        if (flowFile == null) {
-            return;
-        }
-
+        final boolean skipInvalidRows = 
context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean();
         final RecordReaderFactory readerFactory = 
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
 
         int recordNumWritten;
         try {
             try (InputStream in = session.read(flowFile);
                     RecordReader reader = 
readerFactory.createRecordReader(flowFile, in, getLogger())) {
-                recordNumWritten = writeRecordsToStream(reader, 
protoDescriptor);
+                recordNumWritten = writeRecordsToStream(reader, 
protoDescriptor, skipInvalidRows);
             }
             flowFile = session.putAttribute(flowFile, 
BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten));
         } catch (Exception e) {
@@ -234,13 +236,13 @@ public class PutBigQuery extends 
AbstractBigQueryProcessor {
         }
     }
 
-    private int writeRecordsToStream(RecordReader reader, 
Descriptors.Descriptor descriptor) throws Exception {
+    private int writeRecordsToStream(RecordReader reader, 
Descriptors.Descriptor descriptor, boolean skipInvalidRows) throws Exception {
         Record currentRecord;
         int offset = 0;
         int recordNum = 0;
         ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
         while ((currentRecord = reader.nextRecord()) != null) {
-            DynamicMessage message = recordToProtoMessage(currentRecord, 
descriptor);
+            DynamicMessage message = recordToProtoMessage(currentRecord, 
descriptor, skipInvalidRows);
 
             if (message == null) {
                 continue;
@@ -262,7 +264,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
         return recordNum;
     }
 
-    private DynamicMessage recordToProtoMessage(Record record, 
Descriptors.Descriptor descriptor) {
+    private DynamicMessage recordToProtoMessage(Record record, 
Descriptors.Descriptor descriptor, boolean skipInvalidRows) {
         Map<String, Object> valueMap = convertMapRecord(record.toMap());
         DynamicMessage message = null;
         try {
@@ -369,7 +371,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
         }
     }
 
-    private WriteStream createWriteStream() {
+    private WriteStream createWriteStream(TableName tableName) {
         WriteStream.Type type = isBatch() ? WriteStream.Type.PENDING : 
WriteStream.Type.COMMITTED;
         CreateWriteStreamRequest createWriteStreamRequest = 
CreateWriteStreamRequest.newBuilder()
             .setParent(tableName.toString())

Reply via email to