This is an automated email from the ASF dual-hosted git repository.
mbathori pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new eb1339c302 NIFI-14277 Added credential scope in GCP PubSub and
BigQuery processors
eb1339c302 is described below
commit eb1339c302d40c838c2d0739d8099c764237a467
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Feb 19 13:38:34 2025 +0100
NIFI-14277 Added credential scope in GCP PubSub and BigQuery processors
The credential scope is required when a private endpoint is used.
- also used ComponentLog in FlowFileResult in order to emit bulletin error
logs
- also removed separate AbstractGCPubSubWithProxyProcessor which is no
longer needed since PubSubLite processors have been removed
This closes #9728.
Signed-off-by: Mark Bathori <[email protected]>
---
.../gcp/bigquery/AbstractBigQueryProcessor.java | 7 +++
.../gcp/pubsub/AbstractGCPubSubProcessor.java | 43 +++++++++++++++-
.../pubsub/AbstractGCPubSubWithProxyProcessor.java | 57 ----------------------
.../processors/gcp/pubsub/ConsumeGCPubSub.java | 2 +-
.../processors/gcp/pubsub/PublishGCPubSub.java | 8 +--
.../gcp/pubsub/publish/FlowFileResult.java | 10 ++--
.../nifi/processors/gcp/util/GoogleUtils.java | 2 +
7 files changed, 61 insertions(+), 68 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
index 06494f47f5..10b3b00e57 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -45,6 +45,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_BIGQUERY_SCOPE;
+
/**
* Base class for creating processors that connect to GCP BiqQuery service
*/
@@ -101,6 +103,11 @@ public abstract class AbstractBigQueryProcessor extends
AbstractGCPProcessor<Big
return RELATIONSHIPS;
}
+ @Override
+ protected GoogleCredentials getGoogleCredentials(ProcessContext context) {
+ return
super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_BIGQUERY_SCOPE);
+ }
+
@Override
protected BigQueryOptions getServiceOptions(ProcessContext context,
GoogleCredentials credentials) {
final String projectId =
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
index a8cae49aa6..48c87e9a80 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
@@ -16,10 +16,14 @@
*/
package org.apache.nifi.processors.gcp.pubsub;
+import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ServiceOptions;
-
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import io.grpc.HttpConnectProxiedSocketAddress;
+import io.grpc.ProxiedSocketAddress;
+import io.grpc.ProxyDetector;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -29,11 +33,18 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.VerifiableProcessor;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.proxy.ProxyConfiguration;
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.SocketAddress;
import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
+import static
org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PUBSUB_SCOPE;
+
public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor
implements VerifiableProcessor {
public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new
PropertyDescriptor.Builder()
@@ -120,4 +131,34 @@ public abstract class AbstractGCPubSubProcessor extends
AbstractGCPProcessor imp
return results;
}
+ @Override
+ protected GoogleCredentials getGoogleCredentials(ProcessContext context) {
+ return
super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PUBSUB_SCOPE);
+ }
+
+ protected TransportChannelProvider
getTransportChannelProvider(ProcessContext context) {
+ final ProxyConfiguration proxyConfiguration =
ProxyConfiguration.getConfiguration(context);
+
+ return TopicAdminSettings.defaultGrpcTransportProviderBuilder()
+ .setChannelConfigurator(managedChannelBuilder ->
managedChannelBuilder.proxyDetector(
+ new ProxyDetector() {
+ @Nullable
+ @Override
+ public ProxiedSocketAddress proxyFor(SocketAddress
socketAddress) {
+ if
(Proxy.Type.HTTP.equals(proxyConfiguration.getProxyType())) {
+ return
HttpConnectProxiedSocketAddress.newBuilder()
+
.setUsername(proxyConfiguration.getProxyUserName())
+
.setPassword(proxyConfiguration.getProxyUserPassword())
+ .setProxyAddress(new
InetSocketAddress(proxyConfiguration.getProxyServerHost(),
+
proxyConfiguration.getProxyServerPort()))
+
.setTargetAddress((InetSocketAddress) socketAddress)
+ .build();
+ } else {
+ return null;
+ }
+ }
+ }))
+ .build();
+ }
+
}
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java
deleted file mode 100644
index aed2b945a5..0000000000
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub;
-
-import com.google.api.gax.rpc.TransportChannelProvider;
-import com.google.cloud.pubsub.v1.TopicAdminSettings;
-import io.grpc.HttpConnectProxiedSocketAddress;
-import io.grpc.ProxiedSocketAddress;
-import io.grpc.ProxyDetector;
-import java.net.InetSocketAddress;
-import java.net.Proxy;
-import java.net.SocketAddress;
-import javax.annotation.Nullable;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.proxy.ProxyConfiguration;
-
-public abstract class AbstractGCPubSubWithProxyProcessor extends
AbstractGCPubSubProcessor {
-
- protected TransportChannelProvider
getTransportChannelProvider(ProcessContext context) {
- final ProxyConfiguration proxyConfiguration =
ProxyConfiguration.getConfiguration(context);
-
- return TopicAdminSettings.defaultGrpcTransportProviderBuilder()
- .setChannelConfigurator(managedChannelBuilder ->
managedChannelBuilder.proxyDetector(
- new ProxyDetector() {
- @Nullable
- @Override
- public ProxiedSocketAddress proxyFor(SocketAddress
socketAddress) {
- if
(Proxy.Type.HTTP.equals(proxyConfiguration.getProxyType())) {
- return
HttpConnectProxiedSocketAddress.newBuilder()
-
.setUsername(proxyConfiguration.getProxyUserName())
-
.setPassword(proxyConfiguration.getProxyUserPassword())
- .setProxyAddress(new
InetSocketAddress(proxyConfiguration.getProxyServerHost(),
-
proxyConfiguration.getProxyServerPort()))
-
.setTargetAddress((InetSocketAddress) socketAddress)
- .build();
- } else {
- return null;
- }
- }
- }))
- .build();
- }
-}
\ No newline at end of file
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 4e1be6eb03..8eae56ec59 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -82,7 +82,7 @@ import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_
@WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description =
MSG_PUBLISH_TIME_DESCRIPTION),
@WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description
= DYNAMIC_ATTRIBUTES_DESCRIPTION)
})
-public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
+public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
private static final List<String> REQUIRED_PERMISSIONS =
Collections.singletonList("pubsub.subscriptions.consume");
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 62a80a7a99..fbde3b3d71 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -106,7 +106,7 @@ import static
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_
})
@SystemResourceConsideration(resource = SystemResource.MEMORY, description =
"The entirety of the FlowFile's content "
+ "will be read into memory to be sent as a PubSub message.")
-public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
+public class PublishGCPubSub extends AbstractGCPubSubProcessor {
private static final List<String> REQUIRED_PERMISSIONS =
Collections.singletonList("pubsub.topics.publish");
private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
@@ -333,7 +333,7 @@ public class PublishGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
if (flowFile.getSize() > maxMessageSize) {
final String message = String.format("FlowFile size %d exceeds
MAX_MESSAGE_SIZE", flowFile.getSize());
failures.add(new IllegalArgumentException(message));
- flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures));
+ flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures, getLogger()));
} else {
baos.reset();
session.exportTo(flowFile, baos);
@@ -341,7 +341,7 @@ public class PublishGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
final ApiFuture<String> apiFuture = publishOneMessage(context,
flowFile, baos.toByteArray());
futures.add(apiFuture);
addCallback(apiFuture, new TrackedApiFutureCallback(successes,
failures), executor);
- flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures));
+ flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures, getLogger()));
}
}
finishBatch(session, stopWatch, flowFileResults);
@@ -391,7 +391,7 @@ public class PublishGCPubSub extends
AbstractGCPubSubWithProxyProcessor {
futures.add(apiFuture);
addCallback(apiFuture, new
TrackedApiFutureCallback(successes, failures), executor);
}
- flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures));
+ flowFileResults.add(new FlowFileResult(flowFile, futures,
successes, failures, getLogger()));
}
}
finishBatch(session, stopWatch, flowFileResults);
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
index 99300d1abe..9f81c2f12c 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
@@ -20,11 +20,10 @@ import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.UnavailableException;
import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes;
import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.Collection;
import java.util.Collections;
@@ -37,21 +36,22 @@ import java.util.concurrent.ExecutionException;
* Tracking of an interaction from NiFi {@link PublishGCPubSub} processor to
Google PubSub endpoint.
*/
public class FlowFileResult {
- private static final Logger logger =
LoggerFactory.getLogger(FlowFileResult.class);
private final FlowFile flowFile;
private final Map<String, String> attributes;
private final List<ApiFuture<String>> futures;
private final List<String> successes;
private final List<Throwable> failures;
+ private final ComponentLog componentLog;
public FlowFileResult(final FlowFile flowFile, final
List<ApiFuture<String>> futures,
- final List<String> successes, final List<Throwable>
failures) {
+ final List<String> successes, final List<Throwable>
failures, final ComponentLog componentLog) {
this.flowFile = flowFile;
this.attributes = new LinkedHashMap<>();
this.futures = futures;
this.successes = successes;
this.failures = failures;
+ this.componentLog = componentLog;
}
/**
@@ -62,7 +62,7 @@ public class FlowFileResult {
try {
ApiFutures.allAsList(futures).get();
} catch (InterruptedException | ExecutionException e) {
- logger.error("Failed to reconcile PubSub send operation
status", e);
+ componentLog.error("Failed to reconcile PubSub send operation
status", e);
}
}
if (futures.size() == successes.size()) {
diff --git
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java
index cebfa4419f..219c909fca 100644
---
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java
+++
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java
@@ -22,6 +22,8 @@ import
org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
public class GoogleUtils {
public static final String GOOGLE_CLOUD_PLATFORM_SCOPE =
"https://www.googleapis.com/auth/cloud-platform";
+ public static final String GOOGLE_CLOUD_PUBSUB_SCOPE =
"https://www.googleapis.com/auth/pubsub";
+ public static final String GOOGLE_CLOUD_BIGQUERY_SCOPE =
"https://www.googleapis.com/auth/bigquery";
/**
* Links to the {@link GCPCredentialsService} which provides credentials
for this particular processor.