turcsanyip commented on code in PR #10485:
URL: https://github.com/apache/nifi/pull/10485#discussion_r2527503653
##########
nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-services-api/pom.xml:
##########
@@ -26,6 +26,10 @@
<packaging>jar</packaging>
<dependencies>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-oauth2-provider-api</artifactId>
+ </dependency>
Review Comment:
`nifi-oauth2-provider-api` is not a dependency of the services api module
anymore, so it should be removed.
##########
nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/factory/CredentialPropertyDescriptors.java:
##########
@@ -79,12 +80,65 @@ private CredentialPropertyDescriptors() { }
.sensitive(true)
.build();
+ private static final String DEFAULT_WORKLOAD_IDENTITY_SCOPE =
"https://www.googleapis.com/auth/cloud-platform";
+ private static final String DEFAULT_WORKLOAD_IDENTITY_TOKEN_ENDPOINT =
"https://sts.googleapis.com/v1/token";
+ private static final String DEFAULT_WORKLOAD_IDENTITY_SUBJECT_TOKEN_TYPE =
"urn:ietf:params:oauth:token-type:jwt";
+
+ public static final PropertyDescriptor WORKLOAD_IDENTITY_AUDIENCE = new
PropertyDescriptor.Builder()
+ .name("Audience")
+ .description("The audience corresponding to the target Workload
Identity Provider, typically the full resource name.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AuthenticationStrategy.WORKLOAD_IDENTITY_FEDERATION.getValue())
+ .build();
+
+ public static final PropertyDescriptor WORKLOAD_IDENTITY_SCOPE = new
PropertyDescriptor.Builder()
+ .name("Scope")
+ .description("OAuth2 scopes requested for the exchanged access
token. Multiple scopes can be separated by space or comma.")
+ .required(true)
+ .defaultValue(DEFAULT_WORKLOAD_IDENTITY_SCOPE)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AuthenticationStrategy.WORKLOAD_IDENTITY_FEDERATION.getValue())
+ .build();
+
+ public static final PropertyDescriptor WORKLOAD_IDENTITY_TOKEN_ENDPOINT =
new PropertyDescriptor.Builder()
+ .name("STS Token Endpoint")
+ .description("Google Security Token Service endpoint used for
token exchange.")
+ .required(false)
+ .defaultValue(DEFAULT_WORKLOAD_IDENTITY_TOKEN_ENDPOINT)
+ .addValidator(StandardValidators.URL_VALIDATOR)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AuthenticationStrategy.WORKLOAD_IDENTITY_FEDERATION.getValue())
+ .build();
+
+ public static final PropertyDescriptor
WORKLOAD_IDENTITY_SUBJECT_TOKEN_PROVIDER = new PropertyDescriptor.Builder()
+ .name("Subject Token Provider")
+ .description("Controller Service that retrieves the external
workload identity token to exchange.")
+ .identifiesControllerService(OAuth2AccessTokenProvider.class)
+ .required(true)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AuthenticationStrategy.WORKLOAD_IDENTITY_FEDERATION.getValue())
+ .build();
+
+ public static final PropertyDescriptor
WORKLOAD_IDENTITY_SUBJECT_TOKEN_TYPE = new PropertyDescriptor.Builder()
+ .name("Subject Token Type")
+ .description("The type of token returned by the Subject Token
Provider.")
+ .required(true)
+ .defaultValue(DEFAULT_WORKLOAD_IDENTITY_SUBJECT_TOKEN_TYPE)
+ .allowableValues(
+ DEFAULT_WORKLOAD_IDENTITY_SUBJECT_TOKEN_TYPE,
+ "urn:ietf:params:oauth:token-type:access_token"
Review Comment:
The default GCP STS Endpoint does not seem to support `access_token` type
(it returns error for me). On the other hand, it accepts
`urn:ietf:params:oauth:token-type:id_token`. So I think we should add the
`id_token` option. We can also keep `access_token` because other endpoints may
support it. I'm unsure about that.
##########
nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/factory/CredentialPropertyDescriptors.java:
##########
@@ -79,12 +80,65 @@ private CredentialPropertyDescriptors() { }
.sensitive(true)
.build();
+ private static final String DEFAULT_WORKLOAD_IDENTITY_SCOPE =
"https://www.googleapis.com/auth/cloud-platform";
+ private static final String DEFAULT_WORKLOAD_IDENTITY_TOKEN_ENDPOINT =
"https://sts.googleapis.com/v1/token";
+ private static final String DEFAULT_WORKLOAD_IDENTITY_SUBJECT_TOKEN_TYPE =
"urn:ietf:params:oauth:token-type:jwt";
+
+ public static final PropertyDescriptor WORKLOAD_IDENTITY_AUDIENCE = new
PropertyDescriptor.Builder()
+ .name("Audience")
+ .description("The audience corresponding to the target Workload
Identity Provider, typically the full resource name.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AuthenticationStrategy.WORKLOAD_IDENTITY_FEDERATION.getValue())
+ .build();
+
+ public static final PropertyDescriptor WORKLOAD_IDENTITY_SCOPE = new
PropertyDescriptor.Builder()
+ .name("Scope")
+ .description("OAuth2 scopes requested for the exchanged access
token. Multiple scopes can be separated by space or comma.")
+ .required(true)
+ .defaultValue(DEFAULT_WORKLOAD_IDENTITY_SCOPE)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .dependsOn(AUTHENTICATION_STRATEGY,
AuthenticationStrategy.WORKLOAD_IDENTITY_FEDERATION.getValue())
+ .build();
+
+ public static final PropertyDescriptor WORKLOAD_IDENTITY_TOKEN_ENDPOINT =
new PropertyDescriptor.Builder()
+ .name("STS Token Endpoint")
+ .description("Google Security Token Service endpoint used for
token exchange.")
+ .required(false)
+ .defaultValue(DEFAULT_WORKLOAD_IDENTITY_TOKEN_ENDPOINT)
Review Comment:
`STS Token Endpoint` is effectively mandatory (it has a default value and is
used in the code without null-check), so I would declare it with
`required(true)` for the UI as well.
##########
nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService/additionalDetails.md:
##########
@@ -0,0 +1,168 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# GCPCredentialsControllerService
+
+The **GCPCredentialsControllerService** centralizes all authentication
strategies used by NiFi components that
+interact with Google Cloud. Each strategy exposes only the properties it
requires, which lets administrators swap
+approaches without touching downstream processors. This guide summarizes every
supported strategy and retains the
+full Workload Identity Federation instructions that previously lived in the
dedicated controller service.
+
+---
+
+## Application Default Credentials
+
+Application Default Credentials (ADC) allow NiFi to inherit credentials
exposed through the runtime environment,
+including:
+- The `GOOGLE_APPLICATION_CREDENTIALS` environment variable referencing a
service-account key file
+- `gcloud auth application-default login` on development machines
+- Cloud Shell or other Google-managed environments that inject ADC
automatically
+
+No extra properties are required. Confirm that the account supplying the ADC
token has the IAM roles needed by the
+processors referencing this controller service.
+
+---
+
+## Service Account Credentials (JSON File)
+
+Use this strategy when the service-account key material is stored on disk.
Configure the **Service Account JSON
+File** property to point at the JSON key file. NiFi reads the file when the
controller service is enabled and
+caches the Google credentials.
+
+**Best practices**
+- Restrict filesystem permissions so only the NiFi service user can read the
key.
+- Rotate keys regularly and delete unused keys from the Google Cloud Console.
+- When impersonating a domain user, set **Delegation Strategy** to *Delegated
Account* and provide **Delegation
+ User** so that NiFi calls Google APIs on behalf of that user.
+
+---
+
+## Service Account Credentials (JSON Value)
+
+This strategy embeds the entire service-account JSON document directly inside
the controller-service property. The
+value is marked sensitive and can be injected through Parameter Contexts to
separate credentials from flow
+definitions.
+
+**Best practices**
+- Store the JSON value in a Parameter Context referenced by this property so
you can swap credentials per
+ environment.
+- Use NiFi’s Sensitive Property encryption in `nifi.properties` to encrypt the
stored JSON on disk.
+
+---
+
+## Compute Engine Credentials
+
+Select **Compute Engine Credentials** when NiFi runs on a Google-managed
runtime (Compute Engine, GKE, etc.) and
+should use the instance’s attached service account. Google automatically
refreshes the metadata server tokens, so
+no additional properties are required.
+
+**Best practices**
+- Grant the instance service account only the roles required by your flows.
+- If multiple NiFi nodes share the same instance template, verify that all
nodes have access to the same IAM
+ permissions or configure Workload Identity Federation for finer control.
+
+---
+
+## Workload Identity Federation
+
+Workload Identity Federation (WIF) exchanges an external identity-provider
token for a short-lived Google Cloud
+access token via Google’s Security Token Service (STS). The controller service
configures Google’s
+`IdentityPoolCredentials`, allowing Google client libraries to refresh Google
Cloud tokens automatically. This is
+functionally identical to the retired
`StandardGCPIdentityFederationTokenProvider`, but all properties now reside
+on GCPCredentialsControllerService itself.
Review Comment:
Since `StandardGCPIdentityFederationTokenProvider` was not released, it does
not need to be mentioned.
```suggestion
`IdentityPoolCredentials`, allowing Google client libraries to refresh
Google Cloud tokens automatically.
```
##########
nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/resources/docs/org.apache.nifi.processors.gcp.credentials.service.GCPCredentialsControllerService/additionalDetails.md:
##########
@@ -0,0 +1,168 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+# GCPCredentialsControllerService
+
+The **GCPCredentialsControllerService** centralizes all authentication
strategies used by NiFi components that
+interact with Google Cloud. Each strategy exposes only the properties it
requires, which lets administrators swap
+approaches without touching downstream processors. This guide summarizes every
supported strategy and retains the
+full Workload Identity Federation instructions that previously lived in the
dedicated controller service.
Review Comment:
Since `StandardGCPIdentityFederationTokenProvider` was not released, it does
not need to be mentioned.
```suggestion
approaches without touching downstream processors. This guide summarizes
every supported strategy.
```
##########
nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/factory/strategies/WorkloadIdentityFederationCredentialsStrategy.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.credentials.factory.strategies;
+
+import com.google.auth.http.HttpTransportFactory;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.IdentityPoolCredentials;
+import com.google.auth.oauth2.IdentityPoolSubjectTokenSupplier;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Credentials strategy that configures Workload Identity Federation using
Google Identity Pool credentials.
+ */
+public class WorkloadIdentityFederationCredentialsStrategy extends
AbstractCredentialsStrategy {
+
+ private static final long SUBJECT_TOKEN_REFRESH_SKEW_SECONDS = 60;
+ private static final String ERROR_NO_SUBJECT_TOKEN = "Subject token
provider returned no usable token";
+
+ public WorkloadIdentityFederationCredentialsStrategy() {
+ super("Workload Identity Federation");
+ }
+
+ @Override
+ public GoogleCredentials getGoogleCredentials(final
Map<PropertyDescriptor, String> properties, final HttpTransportFactory
transportFactory) {
+ throw new UnsupportedOperationException("Workload Identity Federation
requires the controller service configuration context");
+ }
+
+ @Override
+ public GoogleCredentials getGoogleCredentials(final ConfigurationContext
context, final HttpTransportFactory transportFactory) throws IOException {
+ final OAuth2AccessTokenProvider subjectTokenProvider =
context.getProperty(CredentialPropertyDescriptors.WORKLOAD_IDENTITY_SUBJECT_TOKEN_PROVIDER)
+ .asControllerService(OAuth2AccessTokenProvider.class);
+ final String audience =
context.getProperty(CredentialPropertyDescriptors.WORKLOAD_IDENTITY_AUDIENCE).getValue();
+ final String scopeValue =
context.getProperty(CredentialPropertyDescriptors.WORKLOAD_IDENTITY_SCOPE).getValue();
+ final String tokenEndpoint =
context.getProperty(CredentialPropertyDescriptors.WORKLOAD_IDENTITY_TOKEN_ENDPOINT).getValue();
+ final String subjectTokenType =
context.getProperty(CredentialPropertyDescriptors.WORKLOAD_IDENTITY_SUBJECT_TOKEN_TYPE).getValue();
+
+ final List<String> scopes = parseScopes(scopeValue);
+ final IdentityPoolSubjectTokenSupplier tokenSupplier =
createSubjectTokenSupplier(subjectTokenProvider);
+ final IdentityPoolCredentials.Builder builder =
IdentityPoolCredentials.newBuilder()
+ .setAudience(audience)
+ .setTokenUrl(tokenEndpoint)
+ .setSubjectTokenType(subjectTokenType)
+ .setSubjectTokenSupplier(tokenSupplier);
+
+ if (!scopes.isEmpty()) {
+ builder.setScopes(scopes);
+ }
+
+ if (transportFactory != null) {
+ builder.setHttpTransportFactory(transportFactory);
+ }
+
+ return builder.build();
+ }
+
+ private IdentityPoolSubjectTokenSupplier createSubjectTokenSupplier(final
OAuth2AccessTokenProvider tokenProvider) {
+ return context -> getSubjectToken(tokenProvider);
+ }
+
+ private String getSubjectToken(final OAuth2AccessTokenProvider
tokenProvider) throws IOException {
+ AccessToken subjectToken = tokenProvider.getAccessDetails();
+
+ if (requiresRefresh(subjectToken)) {
+ tokenProvider.refreshAccessDetails();
+ subjectToken = tokenProvider.getAccessDetails();
+ }
+
+ final String subjectTokenValue = extractTokenValue(subjectToken);
+ if (StringUtils.isBlank(subjectTokenValue)) {
+ throw new IOException(ERROR_NO_SUBJECT_TOKEN);
+ }
+
+ return subjectTokenValue;
+ }
+
+ private boolean requiresRefresh(final AccessToken subjectToken) {
+ if (subjectToken == null) {
+ return true;
+ }
+
+ final long expiresIn = subjectToken.getExpiresIn();
+ if (expiresIn <= 0) {
+ return false;
+ }
+
+ final Instant fetchTime = subjectToken.getFetchTime();
+ final Instant refreshTime =
fetchTime.plusSeconds(expiresIn).minusSeconds(SUBJECT_TOKEN_REFRESH_SKEW_SECONDS);
+ return Instant.now().isAfter(refreshTime);
+ }
+
+ private String extractTokenValue(final AccessToken subjectToken) {
+ if (subjectToken == null) {
+ return null;
+ }
+
+ final Map<String, Object> additionalParameters =
subjectToken.getAdditionalParameters();
+ if (additionalParameters != null) {
+ final Object idToken = additionalParameters.get("id_token");
Review Comment:
I suggest adding a constant for `"id_token"`.
##########
nifi-extension-bundles/nifi-gcp-bundle/nifi-gcp-processors/src/main/java/org/apache/nifi/processors/gcp/credentials/factory/strategies/WorkloadIdentityFederationCredentialsStrategy.java:
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.gcp.credentials.factory.strategies;
+
+import com.google.auth.http.HttpTransportFactory;
+import com.google.auth.oauth2.GoogleCredentials;
+import com.google.auth.oauth2.IdentityPoolCredentials;
+import com.google.auth.oauth2.IdentityPoolSubjectTokenSupplier;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.oauth2.AccessToken;
+import org.apache.nifi.oauth2.OAuth2AccessTokenProvider;
+import
org.apache.nifi.processors.gcp.credentials.factory.CredentialPropertyDescriptors;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Credentials strategy that configures Workload Identity Federation using
Google Identity Pool credentials.
+ */
+public class WorkloadIdentityFederationCredentialsStrategy extends
AbstractCredentialsStrategy {
+
+ private static final long SUBJECT_TOKEN_REFRESH_SKEW_SECONDS = 60;
+ private static final String ERROR_NO_SUBJECT_TOKEN = "Subject token
provider returned no usable token";
+
+ public WorkloadIdentityFederationCredentialsStrategy() {
+ super("Workload Identity Federation");
+ }
+
+ @Override
+ public GoogleCredentials getGoogleCredentials(final
Map<PropertyDescriptor, String> properties, final HttpTransportFactory
transportFactory) {
+ throw new UnsupportedOperationException("Workload Identity Federation
requires the controller service configuration context");
+ }
+
+ @Override
+ public GoogleCredentials getGoogleCredentials(final ConfigurationContext
context, final HttpTransportFactory transportFactory) throws IOException {
+ final OAuth2AccessTokenProvider subjectTokenProvider =
context.getProperty(CredentialPropertyDescriptors.WORKLOAD_IDENTITY_SUBJECT_TOKEN_PROVIDER)
+ .asControllerService(OAuth2AccessTokenProvider.class);
+ final String audience =
context.getProperty(CredentialPropertyDescriptors.WORKLOAD_IDENTITY_AUDIENCE).getValue();
+ final String scopeValue =
context.getProperty(CredentialPropertyDescriptors.WORKLOAD_IDENTITY_SCOPE).getValue();
+ final String tokenEndpoint =
context.getProperty(CredentialPropertyDescriptors.WORKLOAD_IDENTITY_TOKEN_ENDPOINT).getValue();
+ final String subjectTokenType =
context.getProperty(CredentialPropertyDescriptors.WORKLOAD_IDENTITY_SUBJECT_TOKEN_TYPE).getValue();
+
+ final List<String> scopes = parseScopes(scopeValue);
+ final IdentityPoolSubjectTokenSupplier tokenSupplier =
createSubjectTokenSupplier(subjectTokenProvider);
+ final IdentityPoolCredentials.Builder builder =
IdentityPoolCredentials.newBuilder()
+ .setAudience(audience)
+ .setTokenUrl(tokenEndpoint)
+ .setSubjectTokenType(subjectTokenType)
+ .setSubjectTokenSupplier(tokenSupplier);
+
+ if (!scopes.isEmpty()) {
+ builder.setScopes(scopes);
+ }
+
+ if (transportFactory != null) {
+ builder.setHttpTransportFactory(transportFactory);
+ }
+
+ return builder.build();
+ }
+
+ private IdentityPoolSubjectTokenSupplier createSubjectTokenSupplier(final
OAuth2AccessTokenProvider tokenProvider) {
+ return context -> getSubjectToken(tokenProvider);
+ }
+
+ private String getSubjectToken(final OAuth2AccessTokenProvider
tokenProvider) throws IOException {
+ AccessToken subjectToken = tokenProvider.getAccessDetails();
+
+ if (requiresRefresh(subjectToken)) {
+ tokenProvider.refreshAccessDetails();
Review Comment:
`OAuth2AccessTokenProvider` implementations are required to return a valid
access token according to the [api
docs](https://github.com/apache/nifi/blob/b5a2ba02161f93bab344d029945505d2a7479fcd/nifi-extension-bundles/nifi-standard-services/nifi-oauth2-provider-api/src/main/java/org/apache/nifi/oauth2/OAuth2AccessTokenProvider.java#L27)
and both `StandardOauth2AccessTokenProvider` and
`JWTBearerOAuth2AccessTokenProvider` have a `Refresh Window` property to
configure the refresh skew time. Is there a specific edge case for handling the
refresh on the client side as well?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]