This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 0344bd3e25 NIFI-11608 Fixed Expression Language Evaluation in
PutBigQuery
0344bd3e25 is described below
commit 0344bd3e25c91e265d48a0410081af51ecb49092
Author: Steven Matison <[email protected]>
AuthorDate: Wed May 31 07:31:27 2023 -0400
NIFI-11608 Fixed Expression Language Evaluation in PutBigQuery
This closes #7316
Signed-off-by: David Handermann <[email protected]>
(cherry picked from commit 645618a6095a94488f845cb427a4d4768161953d)
---
.../nifi/processors/gcp/bigquery/PutBigQuery.java | 34 ++++++++++++----------
1 file changed, 18 insertions(+), 16 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
index edcfcfd14b..54105d9592 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
@@ -115,13 +115,11 @@ public class PutBigQuery extends
AbstractBigQueryProcessor {
private final AtomicReference<Exception> error = new AtomicReference<>();
private final AtomicInteger appendSuccessCount = new AtomicInteger(0);
private final Phaser inflightRequestCount = new Phaser(1);
- private TableName tableName = null;
private BigQueryWriteClient writeClient = null;
private StreamWriter streamWriter = null;
private String transferType;
private int maxRetryCount;
private int recordBatchCount;
- private boolean skipInvalidRows;
public static final PropertyDescriptor PROJECT_ID = new
PropertyDescriptor.Builder()
.fromPropertyDescriptor(AbstractBigQueryProcessor.PROJECT_ID)
@@ -185,12 +183,9 @@ public class PutBigQuery extends AbstractBigQueryProcessor
{
@OnScheduled
public void onScheduled(ProcessContext context) {
super.onScheduled(context);
-
transferType = context.getProperty(TRANSFER_TYPE).getValue();
maxRetryCount = context.getProperty(RETRY_COUNT).asInteger();
- skipInvalidRows = context.getProperty(SKIP_INVALID_ROWS).asBoolean();
recordBatchCount =
context.getProperty(APPEND_RECORD_COUNT).asInteger();
- tableName = TableName.of(context.getProperty(PROJECT_ID).getValue(),
context.getProperty(DATASET).getValue(),
context.getProperty(TABLE_NAME).getValue());
writeClient = createWriteClient(getGoogleCredentials(context));
}
@@ -201,30 +196,37 @@ public class PutBigQuery extends
AbstractBigQueryProcessor {
@Override
public void onTrigger(ProcessContext context, ProcessSession session) {
+ FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ final String projectId = context.getProperty(PROJECT_ID).getValue();
+ final String dataset =
context.getProperty(DATASET).evaluateAttributeExpressions(flowFile).getValue();
+ final String dataTableName =
context.getProperty(TABLE_NAME).evaluateAttributeExpressions(flowFile).getValue();
+ final TableName tableName = TableName.of(projectId, dataset,
dataTableName);
+
WriteStream writeStream;
Descriptors.Descriptor protoDescriptor;
try {
- writeStream = createWriteStream();
+ writeStream = createWriteStream(tableName);
protoDescriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(writeStream.getTableSchema());
streamWriter = createStreamWriter(writeStream.getName(),
protoDescriptor, getGoogleCredentials(context));
} catch (Descriptors.DescriptorValidationException | IOException e) {
getLogger().error("Failed to create Big Query Stream Writer for
writing", e);
context.yield();
+ session.rollback();
return;
}
- FlowFile flowFile = session.get();
- if (flowFile == null) {
- return;
- }
-
+ final boolean skipInvalidRows =
context.getProperty(SKIP_INVALID_ROWS).evaluateAttributeExpressions(flowFile).asBoolean();
final RecordReaderFactory readerFactory =
context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
int recordNumWritten;
try {
try (InputStream in = session.read(flowFile);
RecordReader reader =
readerFactory.createRecordReader(flowFile, in, getLogger())) {
- recordNumWritten = writeRecordsToStream(reader,
protoDescriptor);
+ recordNumWritten = writeRecordsToStream(reader,
protoDescriptor, skipInvalidRows);
}
flowFile = session.putAttribute(flowFile,
BigQueryAttributes.JOB_NB_RECORDS_ATTR, Integer.toString(recordNumWritten));
} catch (Exception e) {
@@ -234,13 +236,13 @@ public class PutBigQuery extends
AbstractBigQueryProcessor {
}
}
- private int writeRecordsToStream(RecordReader reader,
Descriptors.Descriptor descriptor) throws Exception {
+ private int writeRecordsToStream(RecordReader reader,
Descriptors.Descriptor descriptor, boolean skipInvalidRows) throws Exception {
Record currentRecord;
int offset = 0;
int recordNum = 0;
ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
while ((currentRecord = reader.nextRecord()) != null) {
- DynamicMessage message = recordToProtoMessage(currentRecord,
descriptor);
+ DynamicMessage message = recordToProtoMessage(currentRecord,
descriptor, skipInvalidRows);
if (message == null) {
continue;
@@ -262,7 +264,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
return recordNum;
}
- private DynamicMessage recordToProtoMessage(Record record,
Descriptors.Descriptor descriptor) {
+ private DynamicMessage recordToProtoMessage(Record record,
Descriptors.Descriptor descriptor, boolean skipInvalidRows) {
Map<String, Object> valueMap = convertMapRecord(record.toMap());
DynamicMessage message = null;
try {
@@ -369,7 +371,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
}
}
- private WriteStream createWriteStream() {
+ private WriteStream createWriteStream(TableName tableName) {
WriteStream.Type type = isBatch() ? WriteStream.Type.PENDING :
WriteStream.Type.COMMITTED;
CreateWriteStreamRequest createWriteStreamRequest =
CreateWriteStreamRequest.newBuilder()
.setParent(tableName.toString())