This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 610dbb86eb NIFI-11779 - Override endpoint in PutBigQuery
610dbb86eb is described below

commit 610dbb86ebc15a26e6c238da5a906abfe38f2ce1
Author: Pierre Villard <[email protected]>
AuthorDate: Wed Jul 5 17:45:55 2023 +0200

    NIFI-11779 - Override endpoint in PutBigQuery
    
    review
    
    use default from code instead of hardcoding
    
    Signed-off-by: Matt Burgess <[email protected]>
---
 .../nifi/processors/gcp/bigquery/PutBigQuery.java  | 32 +++++++++++++++++++---
 1 file changed, 28 insertions(+), 4 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
index 54105d9592..d3a274b9b1 100644
--- 
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
+++ 
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
@@ -41,6 +41,7 @@ import com.google.cloud.bigquery.storage.v1.StorageError;
 import com.google.cloud.bigquery.storage.v1.StreamWriter;
 import com.google.cloud.bigquery.storage.v1.TableName;
 import com.google.cloud.bigquery.storage.v1.WriteStream;
+import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.DynamicMessage;
 import io.grpc.Status;
@@ -118,6 +119,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
     private BigQueryWriteClient writeClient = null;
     private StreamWriter streamWriter = null;
     private String transferType;
+    private String endpoint;
     private int maxRetryCount;
     private int recordBatchCount;
 
@@ -126,6 +128,18 @@ public class PutBigQuery extends AbstractBigQueryProcessor 
{
         .required(true)
         .build();
 
+    public static final PropertyDescriptor BIGQUERY_API_ENDPOINT = new 
PropertyDescriptor.Builder()
+        .name("bigquery-api-endpoint")
+        .displayName("BigQuery API Endpoint")
+        .description("Can be used to override the default BigQuery endpoint. 
Default is "
+                + BigQueryWriteStubSettings.getDefaultEndpoint() + ". "
+                + "Format must be hostname:port.")
+        .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+        .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+        .required(true)
+        .defaultValue(BigQueryWriteStubSettings.getDefaultEndpoint())
+        .build();
+
     static final PropertyDescriptor TRANSFER_TYPE = new 
PropertyDescriptor.Builder()
         .name(TRANSFER_TYPE_NAME)
         .displayName("Transfer Type")
@@ -165,6 +179,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
     private static final List<PropertyDescriptor> DESCRIPTORS = Stream.of(
         GCP_CREDENTIALS_PROVIDER_SERVICE,
         PROJECT_ID,
+        BIGQUERY_API_ENDPOINT,
         DATASET,
         TABLE_NAME,
         RECORD_READER,
@@ -186,6 +201,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
         transferType = context.getProperty(TRANSFER_TYPE).getValue();
         maxRetryCount = context.getProperty(RETRY_COUNT).asInteger();
         recordBatchCount = 
context.getProperty(APPEND_RECORD_COUNT).asInteger();
+        endpoint = 
context.getProperty(BIGQUERY_API_ENDPOINT).evaluateAttributeExpressions().getValue();
         writeClient = createWriteClient(getGoogleCredentials(context));
     }
 
@@ -384,7 +400,11 @@ public class PutBigQuery extends AbstractBigQueryProcessor 
{
     protected BigQueryWriteClient createWriteClient(GoogleCredentials 
credentials) {
         BigQueryWriteClient client;
         try {
-            client = 
BigQueryWriteClient.create(BigQueryWriteSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build());
+            BigQueryWriteSettings.Builder builder = 
BigQueryWriteSettings.newBuilder();
+            
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+            builder.setEndpoint(endpoint);
+
+            client = BigQueryWriteClient.create(builder.build());
         } catch (Exception e) {
             throw new ProcessException("Failed to create Big Query Write 
Client for writing", e);
         }
@@ -394,9 +414,13 @@ public class PutBigQuery extends AbstractBigQueryProcessor 
{
 
     protected StreamWriter createStreamWriter(String streamName, 
Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws 
IOException {
         ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
-        return StreamWriter.newBuilder(streamName)
-            .setWriterSchema(protoSchema)
-            
.setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();
+
+        StreamWriter.Builder builder = StreamWriter.newBuilder(streamName);
+        builder.setWriterSchema(protoSchema);
+        
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+        builder.setEndpoint(endpoint);
+
+        return builder.build();
     }
 
     private boolean isBatch() {

Reply via email to