This is an automated email from the ASF dual-hosted git repository.

mbathori pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new eb1339c302 NIFI-14277 Added credential scope in GCP PubSub and 
BigQuery processors
eb1339c302 is described below

commit eb1339c302d40c838c2d0739d8099c764237a467
Author: Peter Turcsanyi <[email protected]>
AuthorDate: Wed Feb 19 13:38:34 2025 +0100

    NIFI-14277 Added credential scope in GCP PubSub and BigQuery processors
    
    The credential scope is required when a private endpoint is used.
    
    - also used ComponentLog in FlowFileResult in order to emit bulletin error 
logs
    - also removed separate AbstractGCPubSubWithProxyProcessor which is no 
longer needed since PubSubLite processors have been removed
    
    This closes #9728.
    
    Signed-off-by: Mark Bathori <[email protected]>
---
 .../gcp/bigquery/AbstractBigQueryProcessor.java    |  7 +++
 .../gcp/pubsub/AbstractGCPubSubProcessor.java      | 43 +++++++++++++++-
 .../pubsub/AbstractGCPubSubWithProxyProcessor.java | 57 ----------------------
 .../processors/gcp/pubsub/ConsumeGCPubSub.java     |  2 +-
 .../processors/gcp/pubsub/PublishGCPubSub.java     |  8 +--
 .../gcp/pubsub/publish/FlowFileResult.java         | 10 ++--
 .../nifi/processors/gcp/util/GoogleUtils.java      |  2 +
 7 files changed, 61 insertions(+), 68 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
index 06494f47f5..10b3b00e57 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/bigquery/AbstractBigQueryProcessor.java
@@ -45,6 +45,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_BIGQUERY_SCOPE;
+
 /**
  * Base class for creating processors that connect to GCP BiqQuery service
  */
@@ -101,6 +103,11 @@ public abstract class AbstractBigQueryProcessor extends 
AbstractGCPProcessor<Big
         return RELATIONSHIPS;
     }
 
+    @Override
+    protected GoogleCredentials getGoogleCredentials(ProcessContext context) {
+        return 
super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_BIGQUERY_SCOPE);
+    }
+
     @Override
     protected BigQueryOptions getServiceOptions(ProcessContext context, 
GoogleCredentials credentials) {
         final String projectId = 
context.getProperty(PROJECT_ID).evaluateAttributeExpressions().getValue();
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
index a8cae49aa6..48c87e9a80 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubProcessor.java
@@ -16,10 +16,14 @@
  */
 package org.apache.nifi.processors.gcp.pubsub;
 
+import com.google.api.gax.rpc.TransportChannelProvider;
 import com.google.auth.oauth2.GoogleCredentials;
 import com.google.cloud.ServiceOptions;
-
+import com.google.cloud.pubsub.v1.TopicAdminSettings;
 import com.google.cloud.pubsub.v1.stub.PublisherStubSettings;
+import io.grpc.HttpConnectProxiedSocketAddress;
+import io.grpc.ProxiedSocketAddress;
+import io.grpc.ProxyDetector;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -29,11 +33,18 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.VerifiableProcessor;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.gcp.AbstractGCPProcessor;
+import org.apache.nifi.proxy.ProxyConfiguration;
 
+import javax.annotation.Nullable;
+import java.net.InetSocketAddress;
+import java.net.Proxy;
+import java.net.SocketAddress;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Set;
 
+import static 
org.apache.nifi.processors.gcp.util.GoogleUtils.GOOGLE_CLOUD_PUBSUB_SCOPE;
+
 public abstract class AbstractGCPubSubProcessor extends AbstractGCPProcessor 
implements VerifiableProcessor {
 
     public static final PropertyDescriptor BATCH_SIZE_THRESHOLD = new 
PropertyDescriptor.Builder()
@@ -120,4 +131,34 @@ public abstract class AbstractGCPubSubProcessor extends 
AbstractGCPProcessor imp
         return results;
     }
 
+    @Override
+    protected GoogleCredentials getGoogleCredentials(ProcessContext context) {
+        return 
super.getGoogleCredentials(context).createScoped(GOOGLE_CLOUD_PUBSUB_SCOPE);
+    }
+
+    protected TransportChannelProvider 
getTransportChannelProvider(ProcessContext context) {
+        final ProxyConfiguration proxyConfiguration = 
ProxyConfiguration.getConfiguration(context);
+
+        return TopicAdminSettings.defaultGrpcTransportProviderBuilder()
+                .setChannelConfigurator(managedChannelBuilder -> 
managedChannelBuilder.proxyDetector(
+                        new ProxyDetector() {
+                            @Nullable
+                            @Override
+                            public ProxiedSocketAddress proxyFor(SocketAddress 
socketAddress) {
+                                if 
(Proxy.Type.HTTP.equals(proxyConfiguration.getProxyType())) {
+                                    return 
HttpConnectProxiedSocketAddress.newBuilder()
+                                            
.setUsername(proxyConfiguration.getProxyUserName())
+                                            
.setPassword(proxyConfiguration.getProxyUserPassword())
+                                            .setProxyAddress(new 
InetSocketAddress(proxyConfiguration.getProxyServerHost(),
+                                                    
proxyConfiguration.getProxyServerPort()))
+                                            
.setTargetAddress((InetSocketAddress) socketAddress)
+                                            .build();
+                                } else {
+                                    return null;
+                                }
+                            }
+                        }))
+                .build();
+    }
+
 }
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java
deleted file mode 100644
index aed2b945a5..0000000000
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/AbstractGCPubSubWithProxyProcessor.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.processors.gcp.pubsub;
-
-import com.google.api.gax.rpc.TransportChannelProvider;
-import com.google.cloud.pubsub.v1.TopicAdminSettings;
-import io.grpc.HttpConnectProxiedSocketAddress;
-import io.grpc.ProxiedSocketAddress;
-import io.grpc.ProxyDetector;
-import java.net.InetSocketAddress;
-import java.net.Proxy;
-import java.net.SocketAddress;
-import javax.annotation.Nullable;
-import org.apache.nifi.processor.ProcessContext;
-import org.apache.nifi.proxy.ProxyConfiguration;
-
-public abstract class AbstractGCPubSubWithProxyProcessor extends 
AbstractGCPubSubProcessor {
-
-    protected TransportChannelProvider 
getTransportChannelProvider(ProcessContext context) {
-        final ProxyConfiguration proxyConfiguration = 
ProxyConfiguration.getConfiguration(context);
-
-        return TopicAdminSettings.defaultGrpcTransportProviderBuilder()
-                .setChannelConfigurator(managedChannelBuilder -> 
managedChannelBuilder.proxyDetector(
-                        new ProxyDetector() {
-                            @Nullable
-                            @Override
-                            public ProxiedSocketAddress proxyFor(SocketAddress 
socketAddress) {
-                                if 
(Proxy.Type.HTTP.equals(proxyConfiguration.getProxyType())) {
-                                    return 
HttpConnectProxiedSocketAddress.newBuilder()
-                                            
.setUsername(proxyConfiguration.getProxyUserName())
-                                            
.setPassword(proxyConfiguration.getProxyUserPassword())
-                                            .setProxyAddress(new 
InetSocketAddress(proxyConfiguration.getProxyServerHost(),
-                                                    
proxyConfiguration.getProxyServerPort()))
-                                            
.setTargetAddress((InetSocketAddress) socketAddress)
-                                            .build();
-                                } else {
-                                    return null;
-                                }
-                            }
-                        }))
-                .build();
-    }
-}
\ No newline at end of file
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
index 4e1be6eb03..8eae56ec59 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/ConsumeGCPubSub.java
@@ -82,7 +82,7 @@ import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.SERIALIZED_
         @WritesAttribute(attribute = MSG_PUBLISH_TIME_ATTRIBUTE, description = 
MSG_PUBLISH_TIME_DESCRIPTION),
         @WritesAttribute(attribute = DYNAMIC_ATTRIBUTES_ATTRIBUTE, description 
= DYNAMIC_ATTRIBUTES_DESCRIPTION)
 })
-public class ConsumeGCPubSub extends AbstractGCPubSubWithProxyProcessor {
+public class ConsumeGCPubSub extends AbstractGCPubSubProcessor {
 
     private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.subscriptions.consume");
 
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
index 62a80a7a99..fbde3b3d71 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/PublishGCPubSub.java
@@ -106,7 +106,7 @@ import static 
org.apache.nifi.processors.gcp.pubsub.PubSubAttributes.TOPIC_NAME_
 })
 @SystemResourceConsideration(resource = SystemResource.MEMORY, description = 
"The entirety of the FlowFile's content "
         + "will be read into memory to be sent as a PubSub message.")
-public class PublishGCPubSub extends AbstractGCPubSubWithProxyProcessor {
+public class PublishGCPubSub extends AbstractGCPubSubProcessor {
     private static final List<String> REQUIRED_PERMISSIONS = 
Collections.singletonList("pubsub.topics.publish");
     private static final String TRANSIT_URI_FORMAT_STRING = "gcp://%s";
 
@@ -333,7 +333,7 @@ public class PublishGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
             if (flowFile.getSize() > maxMessageSize) {
                 final String message = String.format("FlowFile size %d exceeds 
MAX_MESSAGE_SIZE", flowFile.getSize());
                 failures.add(new IllegalArgumentException(message));
-                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures));
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures, getLogger()));
             } else {
                 baos.reset();
                 session.exportTo(flowFile, baos);
@@ -341,7 +341,7 @@ public class PublishGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
                 final ApiFuture<String> apiFuture = publishOneMessage(context, 
flowFile, baos.toByteArray());
                 futures.add(apiFuture);
                 addCallback(apiFuture, new TrackedApiFutureCallback(successes, 
failures), executor);
-                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures));
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures, getLogger()));
             }
         }
         finishBatch(session, stopWatch, flowFileResults);
@@ -391,7 +391,7 @@ public class PublishGCPubSub extends 
AbstractGCPubSubWithProxyProcessor {
                     futures.add(apiFuture);
                     addCallback(apiFuture, new 
TrackedApiFutureCallback(successes, failures), executor);
                 }
-                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures));
+                flowFileResults.add(new FlowFileResult(flowFile, futures, 
successes, failures, getLogger()));
             }
         }
         finishBatch(session, stopWatch, flowFileResults);
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
index 99300d1abe..9f81c2f12c 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/pubsub/publish/FlowFileResult.java
@@ -20,11 +20,10 @@ import com.google.api.core.ApiFuture;
 import com.google.api.core.ApiFutures;
 import com.google.api.gax.rpc.UnavailableException;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processors.gcp.pubsub.PubSubAttributes;
 import org.apache.nifi.processors.gcp.pubsub.PublishGCPubSub;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -37,21 +36,22 @@ import java.util.concurrent.ExecutionException;
  * Tracking of an interaction from NiFi {@link PublishGCPubSub} processor to 
Google PubSub endpoint.
  */
 public class FlowFileResult {
-    private static final Logger logger = 
LoggerFactory.getLogger(FlowFileResult.class);
 
     private final FlowFile flowFile;
     private final Map<String, String> attributes;
     private final List<ApiFuture<String>> futures;
     private final List<String> successes;
     private final List<Throwable> failures;
+    private final ComponentLog componentLog;
 
     public FlowFileResult(final FlowFile flowFile, final 
List<ApiFuture<String>> futures,
-                          final List<String> successes, final List<Throwable> 
failures) {
+                          final List<String> successes, final List<Throwable> 
failures, final ComponentLog componentLog) {
         this.flowFile = flowFile;
         this.attributes = new LinkedHashMap<>();
         this.futures = futures;
         this.successes = successes;
         this.failures = failures;
+        this.componentLog = componentLog;
     }
 
     /**
@@ -62,7 +62,7 @@ public class FlowFileResult {
             try {
                 ApiFutures.allAsList(futures).get();
             } catch (InterruptedException | ExecutionException e) {
-                logger.error("Failed to reconcile PubSub send operation 
status", e);
+                componentLog.error("Failed to reconcile PubSub send operation 
status", e);
             }
         }
         if (futures.size() == successes.size()) {
diff --git 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java
 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java
index cebfa4419f..219c909fca 100644
--- 
a/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java
+++ 
b/nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/util/GoogleUtils.java
@@ -22,6 +22,8 @@ import 
org.apache.nifi.gcp.credentials.service.GCPCredentialsService;
 public class GoogleUtils {
 
     public static final String GOOGLE_CLOUD_PLATFORM_SCOPE = 
"https://www.googleapis.com/auth/cloud-platform";;
+    public static final String GOOGLE_CLOUD_PUBSUB_SCOPE = 
"https://www.googleapis.com/auth/pubsub";;
+    public static final String GOOGLE_CLOUD_BIGQUERY_SCOPE = 
"https://www.googleapis.com/auth/bigquery";;
 
     /**
      * Links to the {@link GCPCredentialsService} which provides credentials 
for this particular processor.

Reply via email to