This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
     new 04b8587dfd NIFI-12939: Retry Kerberos login on authentication failure 
in Iceberg processors
04b8587dfd is described below

commit 04b8587dfdf4c785b87035b64ff4ec1eb983645e
Author: Mark Bathori <[email protected]>
AuthorDate: Sun Mar 24 20:57:32 2024 +0100

    NIFI-12939: Retry Kerberos login on authentication failure in Iceberg 
processors
    
    Update rollback and context yield calls
    
    Signed-off-by: Matt Burgess <[email protected]>
---
 .../iceberg/AbstractIcebergProcessor.java          | 41 ++++++++++------------
 .../nifi/processors/iceberg/IcebergUtils.java      | 18 ++++++++++
 .../apache/nifi/processors/iceberg/PutIceberg.java | 31 ++++++++++++----
 3 files changed, 61 insertions(+), 29 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
index 3352959d17..8fcc8ac7d4 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/AbstractIcebergProcessor.java
@@ -25,18 +25,19 @@ import 
org.apache.nifi.components.ClassloaderIsolationKeyProvider;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.hadoop.SecurityUtil;
 import org.apache.nifi.kerberos.KerberosUserService;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.security.krb.KerberosLoginException;
 import org.apache.nifi.security.krb.KerberosUser;
 import org.apache.nifi.services.iceberg.IcebergCatalogService;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.nifi.hadoop.SecurityUtil.getUgiForKerberosUser;
 import static 
org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
@@ -67,36 +68,33 @@ public abstract class AbstractIcebergProcessor extends 
AbstractProcessor impleme
             .description("A FlowFile is routed to this relationship if the 
operation failed and retrying the operation will also fail, such as an invalid 
data or schema.")
             .build();
 
-    private volatile KerberosUser kerberosUser;
-    private volatile UserGroupInformation ugi;
+    protected final AtomicReference<KerberosUser> kerberosUserReference = new 
AtomicReference<>();
+    protected final AtomicReference<UserGroupInformation> ugiReference = new 
AtomicReference<>();
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-        final IcebergCatalogService catalogService = 
context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
+        initKerberosCredentials(context);
+    }
+
+    protected void initKerberosCredentials(ProcessContext context) {
         final KerberosUserService kerberosUserService = 
context.getProperty(KERBEROS_USER_SERVICE).asControllerService(KerberosUserService.class);
+        final IcebergCatalogService catalogService = 
context.getProperty(CATALOG).asControllerService(IcebergCatalogService.class);
 
         if (kerberosUserService != null) {
-            this.kerberosUser = kerberosUserService.createKerberosUser();
+            final KerberosUser kerberosUser = 
kerberosUserService.createKerberosUser();
+            kerberosUserReference.set(kerberosUser);
             try {
-                this.ugi = 
getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()),
 kerberosUser);
+                
ugiReference.set(getUgiForKerberosUser(getConfigurationFromFiles(catalogService.getConfigFilePaths()),
 kerberosUser));
             } catch (IOException e) {
-                throw new ProcessException("Kerberos Authentication failed", 
e);
+                throw new ProcessException("Kerberos authentication failed", 
e);
             }
         }
     }
 
     @OnStopped
     public void onStopped() {
-        if (kerberosUser != null) {
-            try {
-                kerberosUser.logout();
-            } catch (KerberosLoginException e) {
-                getLogger().error("Error logging out kerberos user", e);
-            } finally {
-                kerberosUser = null;
-                ugi = null;
-            }
-        }
+        kerberosUserReference.set(null);
+        ugiReference.set(null);
     }
 
     @Override
@@ -106,6 +104,7 @@ public abstract class AbstractIcebergProcessor extends 
AbstractProcessor impleme
             return;
         }
 
+        final KerberosUser kerberosUser = kerberosUserReference.get();
         if (kerberosUser == null) {
             doOnTrigger(context, session, flowFile);
         } else {
@@ -132,12 +131,8 @@ public abstract class AbstractIcebergProcessor extends 
AbstractProcessor impleme
     }
 
     private UserGroupInformation getUgi() {
-        try {
-            kerberosUser.checkTGTAndRelogin();
-        } catch (KerberosLoginException e) {
-            throw new ProcessException("Unable to re-login with kerberos 
credentials for " + kerberosUser.getPrincipal(), e);
-        }
-        return ugi;
+        SecurityUtil.checkTGTAndRelogin(getLogger(), 
kerberosUserReference.get());
+        return ugiReference.get();
     }
 
     protected abstract void doOnTrigger(ProcessContext context, ProcessSession 
session, FlowFile flowFile) throws ProcessException;
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
index eead5ed61d..ba2680c37e 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-common/src/main/java/org/apache/nifi/processors/iceberg/IcebergUtils.java
@@ -17,6 +17,7 @@
  */
 package org.apache.nifi.processors.iceberg;
 
+import com.google.common.base.Throwables;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -25,6 +26,8 @@ import org.apache.nifi.processor.ProcessContext;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 public class IcebergUtils {
@@ -65,4 +68,19 @@ public class IcebergUtils {
                         e -> 
context.getProperty(e.getKey()).evaluateAttributeExpressions(flowFile).getValue()
                 ));
     }
+
+    /**
+     * Returns an optional with the first throwable in the causal chain that 
is assignable to the provided cause type,
+     * and satisfies the provided cause predicate, {@link Optional#empty()} 
otherwise.
+     *
+     * @param t The throwable to inspect for the cause.
+     * @return Throwable Cause
+     */
+    public static <T extends Throwable> Optional<T> findCause(Throwable t, 
Class<T> expectedCauseType, Predicate<T> causePredicate) {
+        return Throwables.getCausalChain(t).stream()
+                .filter(expectedCauseType::isInstance)
+                .map(expectedCauseType::cast)
+                .filter(causePredicate)
+                .findFirst();
+    }
 }
diff --git 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
index f94453789f..0145124dcf 100644
--- 
a/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
+++ 
b/nifi-nar-bundles/nifi-iceberg-bundle/nifi-iceberg-processors/src/main/java/org/apache/nifi/processors/iceberg/PutIceberg.java
@@ -54,6 +54,7 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.services.iceberg.IcebergCatalogService;
+import org.ietf.jgss.GSSException;
 
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -64,11 +65,13 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
 import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.nifi.processors.iceberg.IcebergUtils.findCause;
 import static 
org.apache.nifi.processors.iceberg.IcebergUtils.getConfigurationFromFiles;
 import static 
org.apache.nifi.processors.iceberg.IcebergUtils.getDynamicProperties;
 
@@ -283,8 +286,16 @@ public class PutIceberg extends AbstractIcebergProcessor {
         try {
             table = loadTable(context, flowFile);
         } catch (Exception e) {
-            getLogger().error("Failed to load table from catalog", e);
-            session.transfer(session.penalize(flowFile), REL_FAILURE);
+            final Optional<GSSException> causeOptional = findCause(e, 
GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+            if (causeOptional.isPresent()) {
+                getLogger().warn("No valid Kerberos credential found, retrying 
login", causeOptional.get());
+                initKerberosCredentials(context);
+                session.rollback();
+                context.yield();
+            } else {
+                getLogger().error("Failed to load table from catalog", e);
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+            }
             return;
         }
 
@@ -309,16 +320,24 @@ public class PutIceberg extends AbstractIcebergProcessor {
             final WriteResult result = taskWriter.complete();
             appendDataFiles(context, flowFile, table, result);
         } catch (Exception e) {
-            getLogger().error("Exception occurred while writing iceberg 
records. Removing uncommitted data files", e);
+            final Optional<GSSException> causeOptional = findCause(e, 
GSSException.class, gsse -> GSSException.NO_CRED == gsse.getMajor());
+            if (causeOptional.isPresent()) {
+                getLogger().warn("No valid Kerberos credential found, retrying 
login", causeOptional.get());
+                initKerberosCredentials(context);
+                session.rollback();
+                context.yield();
+            } else {
+                getLogger().error("Exception occurred while writing Iceberg 
records", e);
+                session.transfer(session.penalize(flowFile), REL_FAILURE);
+            }
+
             try {
                 if (taskWriter != null) {
                     abort(taskWriter.dataFiles(), table);
                 }
             } catch (Exception ex) {
-                getLogger().error("Failed to abort uncommitted data files", 
ex);
+                getLogger().warn("Failed to abort uncommitted data files", ex);
             }
-
-            session.transfer(session.penalize(flowFile), REL_FAILURE);
             return;
         }
 

Reply via email to