This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this 
push:
     new 77b679ce91 Phoenix-7672 Handle Unclosed file via HDFS Lease Recovery 
in ReplicationLogReplay
77b679ce91 is described below

commit 77b679ce91cf48abb57f034d3f5aa16f6a6c8f7f
Author: Himanshu Gwalani <hgwalan...@gmail.com>
AuthorDate: Fri Sep 19 02:50:27 2025 +0530

    Phoenix-7672 Handle Unclosed file via HDFS Lease Recovery in 
ReplicationLogReplay
---
 phoenix-client-parent/pom.xml                      |   6 +
 phoenix-core-server/pom.xml                        |   8 +
 .../replication/reader/RecoverLeaseFSUtils.java    | 333 +++++++++++++++++++++
 .../reader/ReplicationLogProcessor.java            |  82 ++++-
 phoenix-core/pom.xml                               |   5 +
 .../reader/RecoverLeaseFSUtilsTest.java            | 169 +++++++++++
 .../reader/ReplicationLogProcessorTest.java        |   7 +-
 phoenix-mapreduce-byo-shaded-hbase/pom.xml         |   6 +
 8 files changed, 598 insertions(+), 18 deletions(-)

diff --git a/phoenix-client-parent/pom.xml b/phoenix-client-parent/pom.xml
index 0b3f15b8df..3cef52db14 100644
--- a/phoenix-client-parent/pom.xml
+++ b/phoenix-client-parent/pom.xml
@@ -112,6 +112,12 @@
                   <exclude>hbase-webapps/**</exclude>
                 </excludes>
               </filter>
+              <filter>
+                <artifact>*:*</artifact>
+                <excludes>
+                  <exclude>csv-bulk-load-config.properties</exclude>
+                </excludes>
+              </filter>
               <!-- Phoenix specific -->
             </filters>
             <transformers>
diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml
index 42fb746f65..7f74c6eda1 100644
--- a/phoenix-core-server/pom.xml
+++ b/phoenix-core-server/pom.xml
@@ -59,6 +59,14 @@
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-mapreduce-client-core</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+        </dependency>
 
         <!-- HBase dependencies -->
         <dependency>
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
new file mode 100644
index 0000000000..23adb8509d
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.LeaseRecoverable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods for recovering file lease for hdfs.
+ */
+public class RecoverLeaseFSUtils {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(RecoverLeaseFSUtils.class);
+
+    /**
+     * The lease recovery timeout in milliseconds.
+     *
+     * Default is 15 minutes. It's huge, but the idea is that if we have a 
major issue, HDFS
+     * usually needs 10 minutes before marking the nodes as dead. So we're 
putting ourselves
+     * beyond that limit 'to be safe'.
+     */
+    public static final String 
REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND =
+            "phoenix.replication.replay.lease.recovery.timeout.millis";
+
+    public static final long 
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND = 900000;
+
+    /**
+     * The first pause before retrying the lease recovery.
+     * This setting should be a little above what the cluster dfs heartbeat is 
set to.
+     */
+    public static final String 
REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND =
+            "phoenix.replication.replay.lease.recovery.first.pause.mills";
+
+    public static final long 
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND =
+            4000;
+
+    /**
+     * The pause between subsequent retries of the lease recovery.
+     */
+    public static final String 
REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND =
+            "phoenix.replication.replay.lease.recovery.pause.mills";
+
+    public static final long 
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND = 1000;
+
+    /**
+     * The dfs timeout during lease recovery in milliseconds.
+     *
+     * This should be set to how long it'll take for us to timeout against 
primary datanode if it
+     * is dead. We set it to 64 seconds, 4 seconds than the default 
READ_TIMEOUT in HDFS, the
+     * default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still 
failing after this
+     * timeout, then further recovery will take liner backoff with this base, 
to avoid endless
+     * preemptions when this value is not properly configured.
+     */
+    public static final String 
REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND =
+            "phoenix.replication.replay.lease.recovery.dfs.timeout.mills";
+
+    public static final long 
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND =
+            64 * 1000;
+
+
+    private static Class<?> leaseRecoverableClazz = null;
+    private static Method recoverLeaseMethod = null;
+    public static final String LEASE_RECOVERABLE_CLASS_NAME =
+            LeaseRecoverable.class.getCanonicalName();
+    // static {
+    // } "org.apache.hadoop.fs.LeaseRecoverable";
+    static {
+        LOG.debug("Initialize RecoverLeaseFSUtils");
+        initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
+    }
+
+    /**
+     * Initialize reflection classes and methods. If LeaseRecoverable class is 
not found, look for
+     * DistributedFilSystem#recoverLease method.
+     */
+    static void initializeRecoverLeaseMethod(String className) {
+        try {
+            leaseRecoverableClazz = Class.forName(className);
+            recoverLeaseMethod = 
leaseRecoverableClazz.getMethod("recoverLease", Path.class);
+            LOG.debug("set recoverLeaseMethod to " + className + 
".recoverLease()");
+        } catch (ClassNotFoundException e) {
+            LOG.debug("LeaseRecoverable interface not in the classpath, "
+                    + "this means Hadoop 3.3.5 or below.");
+            try {
+                recoverLeaseMethod = 
DistributedFileSystem.class.getMethod("recoverLease",
+                        Path.class);
+            } catch (NoSuchMethodException ex) {
+                LOG.error("Cannot find recoverLease method in 
DistributedFileSystem class. "
+                        + "It should never happen. Abort.", ex);
+                throw new RuntimeException(ex);
+            }
+        } catch (NoSuchMethodException e) {
+            LOG.error("Cannot find recoverLease method in LeaseRecoverable 
class. "
+                    + "It should never happen. Abort.", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    private RecoverLeaseFSUtils() {
+    }
+
+    public static void recoverFileLease(FileSystem fs, Path p, Configuration 
conf)
+            throws IOException {
+        recoverFileLease(fs, p, conf, null);
+    }
+
+    /**
+     * Recover the lease from Hadoop file system, retrying multiple times.
+     */
+    public static void recoverFileLease(FileSystem fs, Path p, Configuration 
conf,
+                                        CancelableProgressable reporter) 
throws IOException {
+        if (fs instanceof FilterFileSystem) {
+            fs = ((FilterFileSystem) fs).getRawFileSystem();
+        }
+
+        // lease recovery not needed for local file system case.
+        if (isLeaseRecoverable(fs)) {
+            recoverDFSFileLease(fs, p, conf, reporter);
+        }
+    }
+
+    public static boolean isLeaseRecoverable(FileSystem fs) {
+        // return true if HDFS.
+        if (fs instanceof DistributedFileSystem) {
+            return true;
+        }
+        // return true if the file system implements LeaseRecoverable 
interface.
+        if (leaseRecoverableClazz != null) {
+            return leaseRecoverableClazz.isAssignableFrom(fs.getClass());
+        }
+        // return false if the file system is not HDFS and does not implement 
LeaseRecoverable.
+        return false;
+    }
+
+    /**
+     * Run the dfs recover lease. recoverLease is asynchronous. It returns: 
-false when it starts
+     * the lease recovery (i.e. lease recovery not *yet* done) - true when the 
lease recovery
+     * has succeeded or the file is closed. But, we have to be careful. Each 
time we call
+     * recoverLease, it starts the recover lease process over from the 
beginning.
+     * We could put ourselves in a situation where we are doing nothing but 
starting a recovery,
+     * interrupting it to start again, and so on.
+     * The findings over in HBASE-8354 have it that the namenode will try to 
recover the lease on
+     * the file's primary node. If all is well, it should return near 
immediately. But, as is
+     * common, it is the very primary node that has crashed and so the 
namenode will be stuck
+     * waiting on a socket timeout before it will ask another datanode to 
start the recovery.
+     * It does not help if we call recoverLease in the meantime and in 
particular, after the
+     * socket timeout, a recoverLease invocation will cause us to start over 
from square one
+     * (possibly waiting on socket timeout against primary node). So, in the 
below, we do the
+     * following: 1. Call recoverLease. 2. If it returns true, break. 3. If it 
returns false,
+     * wait a few seconds and then call it again. 4. If it returns true, 
break. 5. If it returns
+     * false, wait for what we think the datanode socket timeout is 
(configurable) and then try
+     * again. 6. If it returns true, break. 7. If it returns false, repeat 
starting at step 5.
+     * above. If HDFS-4525 is available, call it every second, and we might be 
able to exit early.
+     */
+    private static boolean recoverDFSFileLease(final FileSystem dfs, final 
Path p,
+                                               final Configuration conf,
+                                               final CancelableProgressable 
reporter)
+                                               throws IOException {
+        LOG.info("Recover lease on dfs file " + p);
+
+        long startWaiting = EnvironmentEdgeManager.currentTime();
+        long recoveryTimeout = 
conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND,
+                DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND) 
+ startWaiting;
+        long firstPause = 
conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND,
+                
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND);
+        long subsequentPauseBase = conf.getLong(
+                REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
+                
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND);
+
+        Method isFileClosedMeth = null;
+        // whether we need to look for isFileClosed method
+        boolean findIsFileClosedMeth = true;
+        boolean recovered = false;
+        // We break the loop if we succeed the lease recovery, timeout, or we 
throw an exception.
+        for (int nbAttempt = 0; !recovered; nbAttempt++) {
+            recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
+            if (recovered) {
+                break;
+            }
+            checkIfCancelled(reporter);
+            if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, 
startWaiting)) {
+                break;
+            }
+            try {
+                // On the first time through wait the short 'firstPause'.
+                if (nbAttempt == 0) {
+                    Thread.sleep(firstPause);
+                } else {
+                    // Cycle here until (subsequentPause * nbAttempt) elapses. 
While spinning,
+                    // check isFileClosed if available (should be in hadoop 
2.0.5...
+                    // not in hadoop 1 though).
+                    long localStartWaiting = 
EnvironmentEdgeManager.currentTime();
+                    while (
+                            (EnvironmentEdgeManager.currentTime() - 
localStartWaiting)
+                                    < subsequentPauseBase * nbAttempt
+                    ) {
+                        Thread.sleep(conf.getLong(
+                                
REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND,
+                                
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND));
+                        if (findIsFileClosedMeth) {
+                            try {
+                                isFileClosedMeth = 
dfs.getClass().getMethod("isFileClosed",
+                                        new Class[] { Path.class });
+                            } catch (NoSuchMethodException nsme) {
+                                LOG.debug("isFileClosed not available");
+                            } finally {
+                                findIsFileClosedMeth = false;
+                            }
+                        }
+                        if (isFileClosedMeth != null && isFileClosed(dfs, 
isFileClosedMeth, p)) {
+                            recovered = true;
+                            break;
+                        }
+                        checkIfCancelled(reporter);
+                    }
+                }
+            } catch (InterruptedException ie) {
+                InterruptedIOException iioe = new InterruptedIOException();
+                iioe.initCause(ie);
+                throw iioe;
+            }
+        }
+        return recovered;
+    }
+
+    private static boolean checkIfTimedout(final Configuration conf, final 
long recoveryTimeout,
+                                           final int nbAttempt, final Path p,
+                                           final long startWaiting) {
+        if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) {
+            LOG.warn("Cannot recoverLease after trying for "
+                    + 
conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND,
+                            
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND)
+                    + "ms " + 
REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND
+                    + " continuing, but may be DATALOSS!!!; "
+                    + getLogMessageDetail(nbAttempt, p, startWaiting));
+            return true;
+        }
+        return false;
+    }
+
+    /**
+     * Try to recover the lease.
+     * @return True if dfs#recoverLease came by true.
+     */
+    private static boolean recoverLease(final FileSystem dfs, final int 
nbAttempt, final Path p,
+                                        final long startWaiting) throws 
FileNotFoundException {
+        boolean recovered = false;
+        try {
+            recovered = (Boolean) recoverLeaseMethod.invoke(dfs, p);
+            LOG.info((recovered ? "Recovered lease, " : "Failed to recover 
lease, ")
+                    + getLogMessageDetail(nbAttempt, p, startWaiting));
+        } catch (InvocationTargetException ite) {
+            final Throwable e = ite.getCause();
+            if (e instanceof LeaseExpiredException
+                    && e.getMessage().contains("File does not exist")) {
+                // This exception comes out instead of FNFE, fix it
+                throw new FileNotFoundException("The given replication log 
wasn't found at " + p);
+            } else if (e instanceof FileNotFoundException) {
+                throw (FileNotFoundException) e;
+            }
+            LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
+        } catch (IllegalAccessException e) {
+            LOG.error("Failed to call recoverLease on {}. Abort.", dfs, e);
+            throw new RuntimeException(e);
+        }
+        return recovered;
+    }
+
+    /**
+     * Returns Detail to append to any log message around lease recovering.
+     */
+    private static String getLogMessageDetail(final int nbAttempt, final Path 
p,
+                                              final long startWaiting) {
+        return "attempt=" + nbAttempt + " on file=" + p + " after "
+                + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms";
+    }
+
+    /**
+     * Call HDFS-4525 isFileClosed if it is available.
+     * @return True if file is closed.
+     */
+    private static boolean isFileClosed(final FileSystem dfs, final Method m, 
final Path p) {
+        try {
+            return (Boolean) m.invoke(dfs, p);
+        } catch (SecurityException e) {
+            LOG.warn("No access", e);
+        } catch (Exception e) {
+            LOG.warn("Failed invocation for " + p.toString(), e);
+        }
+        return false;
+    }
+
+    private static void checkIfCancelled(final CancelableProgressable reporter)
+            throws InterruptedIOException {
+        if (reporter == null) {
+            return;
+        }
+        if (!reporter.progress()) {
+            throw new InterruptedIOException("Operation cancelled");
+        }
+    }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index 73dbfdbdf9..4a47779d70 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -24,6 +24,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -31,6 +32,7 @@ import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LeaseRecoverable;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.client.AsyncTable;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.phoenix.replication.log.InvalidLogTrailerException;
 import org.apache.phoenix.replication.log.LogFile;
 import org.apache.phoenix.replication.log.LogFileReader;
 import org.apache.phoenix.replication.log.LogFileReaderContext;
@@ -168,7 +171,7 @@ public class ReplicationLogProcessor implements Closeable {
      */
     public static ReplicationLogProcessor get(Configuration conf, String 
haGroupName) {
         return INSTANCES.computeIfAbsent(haGroupName,
-            k -> new ReplicationLogProcessor(conf, haGroupName));
+                k -> new ReplicationLogProcessor(conf, haGroupName));
     }
 
     /**
@@ -194,9 +197,9 @@ public class ReplicationLogProcessor implements Closeable {
         decorateConf();
         this.metrics = createMetricsSource();
         this.executorService = Executors.newFixedThreadPool(threadPoolSize,
-            new ThreadFactoryBuilder()
-                .setNameFormat("Phoenix-Replication-Log-Processor-" + 
haGroupName + "-%d")
-                .build());
+                new ThreadFactoryBuilder()
+                        .setNameFormat("Phoenix-Replication-Log-Processor-" + 
haGroupName + "-%d")
+                        .build());
     }
 
     /**
@@ -233,7 +236,15 @@ public class ReplicationLogProcessor implements Closeable {
 
         try {
             // Create the LogFileReader for given path
-            logFileReader = createLogFileReader(fs, filePath);
+            Optional<LogFileReader> logFileReaderOptional = 
createLogFileReader(fs, filePath);
+
+            if (!logFileReaderOptional.isPresent()) {
+                // This is an empty file, assume processed successfully and 
return
+                LOG.warn("Found empty file to process {}", filePath);
+                return;
+            }
+
+            logFileReader = logFileReaderOptional.get();
 
             for (LogFile.Record record : logFileReader) {
                 final TableName tableName = 
TableName.valueOf(record.getHBaseTableName());
@@ -287,23 +298,60 @@ public class ReplicationLogProcessor implements Closeable 
{
      * @return A configured LogFileReader instance
      * @throws IOException if the file doesn't exist or initialization fails
      */
-    protected LogFileReader createLogFileReader(FileSystem fs, Path filePath) 
throws IOException {
+    protected Optional<LogFileReader> createLogFileReader(FileSystem fs, Path 
filePath)
+            throws IOException {
         // Ensure that file exists. If we face exception while checking the 
path itself,
         // method would throw same exception back to the caller
         if (!fs.exists(filePath)) {
             throw new IOException("Log file does not exist: " + filePath);
         }
         LogFileReader logFileReader = new LogFileReader();
-        try {
-            LogFileReaderContext logFileReaderContext = new 
LogFileReaderContext(conf)
-                    .setFileSystem(fs).setFilePath(filePath);
+        LogFileReaderContext logFileReaderContext = new 
LogFileReaderContext(conf)
+                .setFileSystem(fs).setFilePath(filePath);
+        boolean isClosed = isFileClosed(fs, filePath);
+        if (isClosed) {
+            // As file is closed, ensure that the file has a valid header and 
trailer
             logFileReader.init(logFileReaderContext);
-        } catch (IOException exception) {
-            LOG.error("Failed to initialize new LogFileReader for path {}",
-                    filePath, exception);
-            throw exception;
+            return Optional.of(logFileReader);
+        } else {
+            LOG.warn("Found un-closed file {}. Starting lease recovery.", 
filePath);
+            recoverLease(fs, filePath);
+            if (fs.getFileStatus(filePath).getLen() <= 0) {
+                // Found empty file, returning null LogReader
+                return Optional.empty();
+            }
+            try {
+                // Acquired the lease, try to create reader with validation 
both header and trailer
+                logFileReader.init(logFileReaderContext);
+                return Optional.of(logFileReader);
+            } catch (InvalidLogTrailerException invalidLogTrailerException) {
+                // If trailer is missing or corrupt, create reader without 
trailer validation
+                LOG.warn("Invalid Trailer for file {}",
+                        filePath, invalidLogTrailerException);
+                logFileReaderContext.setValidateTrailer(false);
+                logFileReader.init(logFileReaderContext);
+                return Optional.of(logFileReader);
+            } catch (IOException exception) {
+                LOG.error("Failed to initialize new LogFileReader for path {}",
+                        filePath, exception);
+                throw exception;
+            }
+        }
+    }
+
+    protected void recoverLease(final FileSystem fs, final Path filePath) 
throws IOException {
+        RecoverLeaseFSUtils.recoverFileLease(fs, filePath, conf);
+    }
+
+    protected boolean isFileClosed(final FileSystem fs, final Path filePath) 
throws IOException {
+        boolean isClosed;
+        try {
+            isClosed = ((LeaseRecoverable) fs).isFileClosed(filePath);
+        } catch (ClassCastException classCastException) {
+            // If filesystem is not of type LeaseRecoverable, assume file is 
always closed
+            isClosed = true;
         }
-        return logFileReader;
+        return isClosed;
     }
 
     /**
@@ -353,7 +401,8 @@ public class ReplicationLogProcessor implements Closeable {
 
                 // Update current operations for next retry
                 currentOperations = result.getFailedMutations();
-                lastError = new IOException("Failed to apply the mutations", 
result.getException());
+                lastError = new IOException("Failed to apply the mutations",
+                        result.getException());
             } catch (IOException e) {
                 lastError = e;
             }
@@ -427,7 +476,8 @@ public class ReplicationLogProcessor implements Closeable {
                 // Add failed mutations to retry list
                 failedOperations.put(tableName, 
tableMutationMap.get(tableName));
                 
getMetrics().incrementFailedMutationsCount(tableMutationMap.get(tableName).size());
-                LOG.debug("Failed to apply mutations for table {}: {}", 
tableName, e.getMessage());
+                LOG.debug("Failed to apply mutations for table {}: {}", 
tableName,
+                        e.getMessage());
                 lastException = e;
             }
         }
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index c049ab4331..1287d72f5f 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -244,6 +244,11 @@
             <artifactId>hbase-compression-aircompressor</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hbase</groupId>
+            <artifactId>hbase-asyncfs</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <!-- HBase Adjacent Dependencies -->
         <dependency>
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
new file mode 100644
index 0000000000..6df8577095
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static 
org.apache.phoenix.replication.reader.RecoverLeaseFSUtils.LEASE_RECOVERABLE_CLASS_NAME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+/**
+ * Test our recoverLease loop against mocked up filesystem.
+ */
+public class RecoverLeaseFSUtilsTest extends ParallelStatsDisabledIT {
+
+    @ClassRule
+    public static TemporaryFolder testFolder = new TemporaryFolder();
+
+    private static Configuration conf;
+    private static FileSystem localFs;
+    private static Path FILE;
+
+    @BeforeClass
+    public static void setupBeforeClass() throws Exception {
+        conf = getUtility().getConfiguration();
+        localFs = FileSystem.getLocal(conf);
+        
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND,
 10);
+        
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND,
 10);
+        FILE = new Path(testFolder.newFile("file.txt").toURI());
+    }
+
+    @AfterClass
+    public static void cleanUp() throws IOException {
+        localFs.delete(new Path(testFolder.getRoot().toURI()), true);
+    }
+
+    /**
+     * Test recover lease eventually succeeding.
+     */
+    @Test
+    public void testRecoverLease() throws IOException {
+        long startTime = EnvironmentEdgeManager.currentTime();
+        
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
 1000);
+        CancelableProgressable reporter = 
Mockito.mock(CancelableProgressable.class);
+        Mockito.when(reporter.progress()).thenReturn(true);
+        DistributedFileSystem dfs = Mockito.mock(DistributedFileSystem.class);
+        // Fail four times and pass on the fifth.
+        
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false)
+                .thenReturn(false).thenReturn(true);
+        RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, conf, reporter);
+        Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE);
+        // Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3 
(the first two
+        // invocations will happen pretty fast... then we fall into the longer 
wait loop).
+        assertTrue((EnvironmentEdgeManager.currentTime() - startTime)
+                > (3 * 
conf.getLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
 61000)));
+    }
+
+    /**
+     * Test that we can use reflection to access LeaseRecoverable methods.
+     */
+    @Test
+    public void testLeaseRecoverable() throws IOException {
+        try {
+            // set LeaseRecoverable to FakeLeaseRecoverable for testing
+            
RecoverLeaseFSUtils.initializeRecoverLeaseMethod(FakeLeaseRecoverable.class.getName());
+            RecoverableFileSystem mockFS = 
Mockito.mock(RecoverableFileSystem.class);
+            Mockito.when(mockFS.recoverLease(FILE)).thenReturn(true);
+            RecoverLeaseFSUtils.recoverFileLease(mockFS, FILE, conf);
+            Mockito.verify(mockFS, Mockito.times(1)).recoverLease(FILE);
+            
assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(Mockito.mock(RecoverableFileSystem.class)));
+        } finally {
+            
RecoverLeaseFSUtils.initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
+        }
+    }
+
+    /**
+     * Test that isFileClosed makes us recover lease faster.
+     */
+    @Test
+    public void testIsFileClosed() throws IOException {
+        // Make this time long so it is plain we broke out because of the 
isFileClosed invocation.
+        
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
 100000);
+        CancelableProgressable reporter = 
Mockito.mock(CancelableProgressable.class);
+        Mockito.when(reporter.progress()).thenReturn(true);
+        IsFileClosedDistributedFileSystem dfs = 
Mockito.mock(IsFileClosedDistributedFileSystem.class);
+        // Now make it so we fail the first two times -- the two fast 
invocations, then we fall into
+        // the long loop during which we will call isFileClosed.... the next 
invocation should
+        // therefore return true if we are to break the loop.
+        
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true);
+        Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true);
+        RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, conf, reporter);
+        Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE);
+        Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
+    }
+
+    /**
+     * Test isLeaseRecoverable for both distributed and local FS
+     */
+    @Test
+    public void testIsLeaseRecoverable() {
+        assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(new 
DistributedFileSystem()));
+        assertFalse(RecoverLeaseFSUtils.isLeaseRecoverable(new 
LocalFileSystem()));
+    }
+
+    private interface FakeLeaseRecoverable {
+        @SuppressWarnings("unused")
+        boolean recoverLease(Path p) throws IOException;
+
+        @SuppressWarnings("unused")
+        boolean isFileClosed(Path p) throws IOException;
+    }
+
+    private static abstract class RecoverableFileSystem extends FileSystem
+            implements FakeLeaseRecoverable {
+        @Override
+        public boolean recoverLease(Path p) throws IOException {
+            return true;
+        }
+
+        @Override
+        public boolean isFileClosed(Path p) throws IOException {
+            return true;
+        }
+    }
+
+    /**
+     * Version of DFS that has HDFS-4525 in it.
+     */
+    private static class IsFileClosedDistributedFileSystem extends 
DistributedFileSystem {
+        /**
+         * Close status of a file. Copied over from HDFS-4525
+         * @return true if file is already closed
+         **/
+        @Override
+        public boolean isFileClosed(Path f) throws IOException {
+            return false;
+        }
+    }
+}
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
index 2bbc18a35d..7d1ce01705 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Properties;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
@@ -138,10 +139,12 @@ public class ReplicationLogProcessorTest extends 
ParallelStatsDisabledIT {
 
         // Test createLogFileReader with valid file - should succeed
         ReplicationLogProcessor replicationLogProcessor = new 
ReplicationLogProcessor(conf, testHAGroupName);
-        LogFileReader reader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+        Optional<LogFileReader> optionalLogFileReader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
 
         // Verify reader is created successfully
-        assertNotNull("Reader should not be null for valid file", reader);
+        assertNotNull("Reader should not be null for valid file", 
optionalLogFileReader);
+        assertTrue("Reader should be present for valid file", 
optionalLogFileReader.isPresent());
+        LogFileReader reader = optionalLogFileReader.get();
         assertNotNull("Reader context should not be null", 
reader.getContext());
         assertEquals("File path should match", validFilePath, 
reader.getContext().getFilePath());
         assertEquals("File system should match", localFs, 
reader.getContext().getFileSystem());
diff --git a/phoenix-mapreduce-byo-shaded-hbase/pom.xml 
b/phoenix-mapreduce-byo-shaded-hbase/pom.xml
index d98b42160d..077d91691c 100644
--- a/phoenix-mapreduce-byo-shaded-hbase/pom.xml
+++ b/phoenix-mapreduce-byo-shaded-hbase/pom.xml
@@ -120,6 +120,12 @@
                 </excludes>
               </filter>
               <!-- Phoenix specific -->
+              <filter>
+                <artifact>*:*</artifact>
+                <excludes>
+                  <exclude>csv-bulk-load-config.properties</exclude>
+                </excludes>
+              </filter>
             </filters>
             <transformers>
               <transformer


Reply via email to