This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new 610dbb86eb NIFI-11779 - Override endpoint in PutBigQuery
610dbb86eb is described below
commit 610dbb86ebc15a26e6c238da5a906abfe38f2ce1
Author: Pierre Villard <[email protected]>
AuthorDate: Wed Jul 5 17:45:55 2023 +0200
NIFI-11779 - Override endpoint in PutBigQuery
review
use default from code instead of hardcoding
Signed-off-by: Matt Burgess <[email protected]>
---
.../nifi/processors/gcp/bigquery/PutBigQuery.java | 32 +++++++++++++++++++---
1 file changed, 28 insertions(+), 4 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
index 54105d9592..d3a274b9b1 100644
---
a/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
+++
b/nifi-nar-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/PutBigQuery.java
@@ -41,6 +41,7 @@ import com.google.cloud.bigquery.storage.v1.StorageError;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
+import com.google.cloud.bigquery.storage.v1.stub.BigQueryWriteStubSettings;
import com.google.protobuf.Descriptors;
import com.google.protobuf.DynamicMessage;
import io.grpc.Status;
@@ -118,6 +119,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
private BigQueryWriteClient writeClient = null;
private StreamWriter streamWriter = null;
private String transferType;
+ private String endpoint;
private int maxRetryCount;
private int recordBatchCount;
@@ -126,6 +128,18 @@ public class PutBigQuery extends AbstractBigQueryProcessor
{
.required(true)
.build();
+ public static final PropertyDescriptor BIGQUERY_API_ENDPOINT = new
PropertyDescriptor.Builder()
+ .name("bigquery-api-endpoint")
+ .displayName("BigQuery API Endpoint")
+ .description("Can be used to override the default BigQuery endpoint.
Default is "
+ + BigQueryWriteStubSettings.getDefaultEndpoint() + ". "
+ + "Format must be hostname:port.")
+ .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .defaultValue(BigQueryWriteStubSettings.getDefaultEndpoint())
+ .build();
+
static final PropertyDescriptor TRANSFER_TYPE = new
PropertyDescriptor.Builder()
.name(TRANSFER_TYPE_NAME)
.displayName("Transfer Type")
@@ -165,6 +179,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
private static final List<PropertyDescriptor> DESCRIPTORS = Stream.of(
GCP_CREDENTIALS_PROVIDER_SERVICE,
PROJECT_ID,
+ BIGQUERY_API_ENDPOINT,
DATASET,
TABLE_NAME,
RECORD_READER,
@@ -186,6 +201,7 @@ public class PutBigQuery extends AbstractBigQueryProcessor {
transferType = context.getProperty(TRANSFER_TYPE).getValue();
maxRetryCount = context.getProperty(RETRY_COUNT).asInteger();
recordBatchCount =
context.getProperty(APPEND_RECORD_COUNT).asInteger();
+ endpoint =
context.getProperty(BIGQUERY_API_ENDPOINT).evaluateAttributeExpressions().getValue();
writeClient = createWriteClient(getGoogleCredentials(context));
}
@@ -384,7 +400,11 @@ public class PutBigQuery extends AbstractBigQueryProcessor
{
protected BigQueryWriteClient createWriteClient(GoogleCredentials
credentials) {
BigQueryWriteClient client;
try {
- client =
BigQueryWriteClient.create(BigQueryWriteSettings.newBuilder().setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build());
+ BigQueryWriteSettings.Builder builder =
BigQueryWriteSettings.newBuilder();
+
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+ builder.setEndpoint(endpoint);
+
+ client = BigQueryWriteClient.create(builder.build());
} catch (Exception e) {
throw new ProcessException("Failed to create Big Query Write
Client for writing", e);
}
@@ -394,9 +414,13 @@ public class PutBigQuery extends AbstractBigQueryProcessor
{
protected StreamWriter createStreamWriter(String streamName,
Descriptors.Descriptor descriptor, GoogleCredentials credentials) throws
IOException {
ProtoSchema protoSchema = ProtoSchemaConverter.convert(descriptor);
- return StreamWriter.newBuilder(streamName)
- .setWriterSchema(protoSchema)
-
.setCredentialsProvider(FixedCredentialsProvider.create(credentials)).build();
+
+ StreamWriter.Builder builder = StreamWriter.newBuilder(streamName);
+ builder.setWriterSchema(protoSchema);
+
builder.setCredentialsProvider(FixedCredentialsProvider.create(credentials));
+ builder.setEndpoint(endpoint);
+
+ return builder.build();
}
private boolean isBatch() {