This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by 
this push:
     new aad16f60c7 Phoenix-7672 Handle Unclosed file via HDFS Lease Recovery 
in ReplicationLogReplay (#2345)
aad16f60c7 is described below

commit aad16f60c795eda58a9d352c1e38452570638585
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Sat Jan 10 22:03:59 2026 +0530

    Phoenix-7672 Handle Unclosed file via HDFS Lease Recovery in 
ReplicationLogReplay (#2345)
---
 phoenix-client-parent/pom.xml                      |   6 +
 phoenix-core-server/pom.xml                        |   8 +
 .../replication/reader/RecoverLeaseFSUtils.java    | 320 +++++++++++++++++++++
 .../reader/ReplicationLogProcessor.java            |  65 ++++-
 phoenix-core/pom.xml                               |   5 +
 .../reader/ReplicationLogProcessorTestIT.java      |  16 +-
 .../reader/RecoverLeaseFSUtilsTest.java            | 170 +++++++++++
 phoenix-mapreduce-byo-shaded-hbase/pom.xml         |   6 +
 8 files changed, 577 insertions(+), 19 deletions(-)

diff --git a/phoenix-client-parent/pom.xml b/phoenix-client-parent/pom.xml
index 47b62405c3..02ff3d6272 100644
--- a/phoenix-client-parent/pom.xml
+++ b/phoenix-client-parent/pom.xml
@@ -110,6 +110,12 @@
                   <exclude>hbase-webapps/**</exclude>
                 </excludes>
               </filter>
+              <filter>
+                <artifact>*:*</artifact>
+                <excludes>
+                  <exclude>csv-bulk-load-config.properties</exclude>
+                </excludes>
+              </filter>
               <!-- Phoenix specific -->
             </filters>
             <transformers>
diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml
index ef5140feb9..914a7078ee 100644
--- a/phoenix-core-server/pom.xml
+++ b/phoenix-core-server/pom.xml
@@ -57,6 +57,14 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-core</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs-client</artifactId>
+    </dependency>
 
     <!-- HBase dependencies -->
     <dependency>
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
new file mode 100644
index 0000000000..a4ad4aa061
--- /dev/null
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.LeaseRecoverable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods for recovering file lease for hdfs.
+ */
+public class RecoverLeaseFSUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RecoverLeaseFSUtils.class);
+
+  /**
+   * The lease recovery timeout in milliseconds. Default is 15 minutes. It's 
huge, but the idea is
+   * that if we have a major issue, HDFS usually needs 10 minutes before 
marking the nodes as dead.
+   * So we're putting ourselves beyond that limit 'to be safe'.
+   */
+  public static final String 
REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND =
+    "phoenix.replication.replay.lease.recovery.timeout.millis";
+
+  public static final long 
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND = 900000;
+
+  /**
+   * The first pause before retrying the lease recovery. This setting should 
be a little above what
+   * the cluster dfs heartbeat is set to.
+   */
+  public static final String 
REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND =
+    "phoenix.replication.replay.lease.recovery.first.pause.mills";
+
+  public static final long 
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND = 4000;
+
+  /**
+   * The pause between subsequent retries of the lease recovery.
+   */
+  public static final String 
REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND =
+    "phoenix.replication.replay.lease.recovery.pause.mills";
+
+  public static final long 
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND = 1000;
+
+  /**
+   * The dfs timeout during lease recovery in milliseconds. This should be set 
to how long it'll
+   * take for us to timeout against primary datanode if it is dead. We set it 
to 64 seconds, 4
+   * seconds than the default READ_TIMEOUT in HDFS, the default value for
+   * DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this 
timeout, then further
+   * recovery will take liner backoff with this base, to avoid endless 
preemptions when this value
+   * is not properly configured.
+   */
+  public static final String 
REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND =
+    "phoenix.replication.replay.lease.recovery.dfs.timeout.mills";
+
+  public static final long 
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND =
+    64 * 1000;
+
+  private static Class<?> leaseRecoverableClazz = null;
+  private static Method recoverLeaseMethod = null;
+  public static final String LEASE_RECOVERABLE_CLASS_NAME =
+    LeaseRecoverable.class.getCanonicalName();
+  // static {
+  // } "org.apache.hadoop.fs.LeaseRecoverable";
+  static {
+    LOG.debug("Initialize RecoverLeaseFSUtils");
+    initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
+  }
+
+  /**
+   * Initialize reflection classes and methods. If LeaseRecoverable class is 
not found, look for
+   * DistributedFilSystem#recoverLease method.
+   */
+  static void initializeRecoverLeaseMethod(String className) {
+    try {
+      leaseRecoverableClazz = Class.forName(className);
+      recoverLeaseMethod = leaseRecoverableClazz.getMethod("recoverLease", 
Path.class);
+      LOG.debug("set recoverLeaseMethod to " + className + ".recoverLease()");
+    } catch (ClassNotFoundException e) {
+      LOG.debug(
+        "LeaseRecoverable interface not in the classpath, " + "this means 
Hadoop 3.3.5 or below.");
+      try {
+        recoverLeaseMethod = 
DistributedFileSystem.class.getMethod("recoverLease", Path.class);
+      } catch (NoSuchMethodException ex) {
+        LOG.error("Cannot find recoverLease method in DistributedFileSystem 
class. "
+          + "It should never happen. Abort.", ex);
+        throw new RuntimeException(ex);
+      }
+    } catch (NoSuchMethodException e) {
+      LOG.error("Cannot find recoverLease method in LeaseRecoverable class. "
+        + "It should never happen. Abort.", e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  private RecoverLeaseFSUtils() {
+  }
+
+  public static void recoverFileLease(FileSystem fs, Path p, Configuration 
conf)
+    throws IOException {
+    recoverFileLease(fs, p, conf, null);
+  }
+
+  /**
+   * Recover the lease from Hadoop file system, retrying multiple times.
+   */
+  public static void recoverFileLease(FileSystem fs, Path p, Configuration 
conf,
+    CancelableProgressable reporter) throws IOException {
+    if (fs instanceof FilterFileSystem) {
+      fs = ((FilterFileSystem) fs).getRawFileSystem();
+    }
+
+    // lease recovery not needed for local file system case.
+    if (isLeaseRecoverable(fs)) {
+      recoverDFSFileLease(fs, p, conf, reporter);
+    }
+  }
+
+  public static boolean isLeaseRecoverable(FileSystem fs) {
+    // return true if HDFS.
+    if (fs instanceof DistributedFileSystem) {
+      return true;
+    }
+    // return true if the file system implements LeaseRecoverable interface.
+    if (leaseRecoverableClazz != null) {
+      return leaseRecoverableClazz.isAssignableFrom(fs.getClass());
+    }
+    // return false if the file system is not HDFS and does not implement 
LeaseRecoverable.
+    return false;
+  }
+
+  /**
+   * Run the dfs recover lease. recoverLease is asynchronous. It returns: 
-false when it starts the
+   * lease recovery (i.e. lease recovery not *yet* done) - true when the lease 
recovery has
+   * succeeded or the file is closed. But, we have to be careful. Each time we 
call recoverLease, it
+   * starts the recover lease process over from the beginning. We could put 
ourselves in a situation
+   * where we are doing nothing but starting a recovery, interrupting it to 
start again, and so on.
+   * The findings over in HBASE-8354 have it that the namenode will try to 
recover the lease on the
+   * file's primary node. If all is well, it should return near immediately. 
But, as is common, it
+   * is the very primary node that has crashed and so the namenode will be 
stuck waiting on a socket
+   * timeout before it will ask another datanode to start the recovery. It 
does not help if we call
+   * recoverLease in the meantime and in particular, after the socket timeout, 
a recoverLease
+   * invocation will cause us to start over from square one (possibly waiting 
on socket timeout
+   * against primary node). So, in the below, we do the following: 1. Call 
recoverLease. 2. If it
+   * returns true, break. 3. If it returns false, wait a few seconds and then 
call it again. 4. If
+   * it returns true, break. 5. If it returns false, wait for what we think 
the datanode socket
+   * timeout is (configurable) and then try again. 6. If it returns true, 
break. 7. If it returns
+   * false, repeat starting at step 5. above. If HDFS-4525 is available, call 
it every second, and
+   * we might be able to exit early.
+   */
+  private static boolean recoverDFSFileLease(final FileSystem dfs, final Path 
p,
+    final Configuration conf, final CancelableProgressable reporter) throws 
IOException {
+    LOG.info("Recover lease on dfs file " + p);
+
+    long startWaiting = EnvironmentEdgeManager.currentTime();
+    long recoveryTimeout = 
conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND,
+      DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND) + 
startWaiting;
+    long firstPause = 
conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND,
+      DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND);
+    long subsequentPauseBase =
+      conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
+        DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND);
+
+    Method isFileClosedMeth = null;
+    // whether we need to look for isFileClosed method
+    boolean findIsFileClosedMeth = true;
+    boolean recovered = false;
+    // We break the loop if we succeed the lease recovery, timeout, or we 
throw an exception.
+    for (int nbAttempt = 0; !recovered; nbAttempt++) {
+      recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
+      if (recovered) {
+        break;
+      }
+      checkIfCancelled(reporter);
+      if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
+        break;
+      }
+      try {
+        // On the first time through wait the short 'firstPause'.
+        if (nbAttempt == 0) {
+          Thread.sleep(firstPause);
+        } else {
+          // Cycle here until (subsequentPause * nbAttempt) elapses. While 
spinning,
+          // check isFileClosed if available (should be in hadoop 2.0.5...
+          // not in hadoop 1 though).
+          long localStartWaiting = EnvironmentEdgeManager.currentTime();
+          while (
+            (EnvironmentEdgeManager.currentTime() - localStartWaiting)
+                < subsequentPauseBase * nbAttempt
+          ) {
+            
Thread.sleep(conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND,
+              DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND));
+            if (findIsFileClosedMeth) {
+              try {
+                isFileClosedMeth =
+                  dfs.getClass().getMethod("isFileClosed", new Class[] { 
Path.class });
+              } catch (NoSuchMethodException nsme) {
+                LOG.debug("isFileClosed not available");
+              } finally {
+                findIsFileClosedMeth = false;
+              }
+            }
+            if (isFileClosedMeth != null && isFileClosed(dfs, 
isFileClosedMeth, p)) {
+              recovered = true;
+              break;
+            }
+            checkIfCancelled(reporter);
+          }
+        }
+      } catch (InterruptedException ie) {
+        InterruptedIOException iioe = new InterruptedIOException();
+        iioe.initCause(ie);
+        throw iioe;
+      }
+    }
+    return recovered;
+  }
+
+  private static boolean checkIfTimedout(final Configuration conf, final long 
recoveryTimeout,
+    final int nbAttempt, final Path p, final long startWaiting) {
+    if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) {
+      LOG.warn("Cannot recoverLease after trying for "
+        + conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND,
+          DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND)
+        + "ms " + REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND
+        + " continuing, but may be DATALOSS!!!; "
+        + getLogMessageDetail(nbAttempt, p, startWaiting));
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Try to recover the lease.
+   * @return True if dfs#recoverLease came by true.
+   */
+  private static boolean recoverLease(final FileSystem dfs, final int 
nbAttempt, final Path p,
+    final long startWaiting) throws FileNotFoundException {
+    boolean recovered = false;
+    try {
+      recovered = (Boolean) recoverLeaseMethod.invoke(dfs, p);
+      LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ")
+        + getLogMessageDetail(nbAttempt, p, startWaiting));
+    } catch (InvocationTargetException ite) {
+      final Throwable e = ite.getCause();
+      if (e instanceof LeaseExpiredException && e.getMessage().contains("File 
does not exist")) {
+        // This exception comes out instead of FNFE, fix it
+        throw new FileNotFoundException("The given replication log wasn't 
found at " + p);
+      } else if (e instanceof FileNotFoundException) {
+        throw (FileNotFoundException) e;
+      }
+      LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
+    } catch (IllegalAccessException e) {
+      LOG.error("Failed to call recoverLease on {}. Abort.", dfs, e);
+      throw new RuntimeException(e);
+    }
+    return recovered;
+  }
+
+  /**
+   * Returns Detail to append to any log message around lease recovering.
+   */
+  private static String getLogMessageDetail(final int nbAttempt, final Path p,
+    final long startWaiting) {
+    return "attempt=" + nbAttempt + " on file=" + p + " after "
+      + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms";
+  }
+
+  /**
+   * Call HDFS-4525 isFileClosed if it is available.
+   * @return True if file is closed.
+   */
+  private static boolean isFileClosed(final FileSystem dfs, final Method m, 
final Path p) {
+    try {
+      return (Boolean) m.invoke(dfs, p);
+    } catch (SecurityException e) {
+      LOG.warn("No access", e);
+    } catch (Exception e) {
+      LOG.warn("Failed invocation for " + p.toString(), e);
+    }
+    return false;
+  }
+
+  private static void checkIfCancelled(final CancelableProgressable reporter)
+    throws InterruptedIOException {
+    if (reporter == null) {
+      return;
+    }
+    if (!reporter.progress()) {
+      throw new InterruptedIOException("Operation cancelled");
+    }
+  }
+
+}
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index 7215ebe20e..be744a5f7e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -24,12 +24,14 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LeaseRecoverable;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.client.AsyncTable;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.phoenix.replication.log.InvalidLogTrailerException;
 import org.apache.phoenix.replication.log.LogFile;
 import org.apache.phoenix.replication.log.LogFileReader;
 import org.apache.phoenix.replication.log.LogFileReaderContext;
@@ -229,7 +232,15 @@ public class ReplicationLogProcessor implements Closeable {
 
     try {
       // Create the LogFileReader for given path
-      logFileReader = createLogFileReader(fs, filePath);
+      Optional<LogFileReader> logFileReaderOptional = createLogFileReader(fs, 
filePath);
+
+      if (!logFileReaderOptional.isPresent()) {
+        // This is an empty file, assume processed successfully and return
+        LOG.warn("Found empty file to process {}", filePath);
+        return;
+      }
+
+      logFileReader = logFileReaderOptional.get();
 
       for (LogFile.Record record : logFileReader) {
         final TableName tableName = 
TableName.valueOf(record.getHBaseTableName());
@@ -280,22 +291,58 @@ public class ReplicationLogProcessor implements Closeable 
{
    * @return A configured LogFileReader instance
    * @throws IOException if the file doesn't exist or initialization fails
    */
-  protected LogFileReader createLogFileReader(FileSystem fs, Path filePath) 
throws IOException {
+  protected Optional<LogFileReader> createLogFileReader(FileSystem fs, Path 
filePath)
+    throws IOException {
     // Ensure that file exists. If we face exception while checking the path 
itself,
     // method would throw same exception back to the caller
     if (!fs.exists(filePath)) {
       throw new IOException("Log file does not exist: " + filePath);
     }
     LogFileReader logFileReader = new LogFileReader();
-    try {
-      LogFileReaderContext logFileReaderContext =
-        new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath);
+    LogFileReaderContext logFileReaderContext =
+      new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath);
+    boolean isClosed = isFileClosed(fs, filePath);
+    if (isClosed) {
+      // As file is closed, ensure that the file has a valid header and trailer
       logFileReader.init(logFileReaderContext);
-    } catch (IOException exception) {
-      LOG.error("Failed to initialize new LogFileReader for path {}", 
filePath, exception);
-      throw exception;
+      return Optional.of(logFileReader);
+    } else {
+      LOG.warn("Found un-closed file {}. Starting lease recovery.", filePath);
+      recoverLease(fs, filePath);
+      if (fs.getFileStatus(filePath).getLen() <= 0) {
+        // Found empty file, returning null LogReader
+        return Optional.empty();
+      }
+      try {
+        // Acquired the lease, try to create reader with validation both 
header and trailer
+        logFileReader.init(logFileReaderContext);
+        return Optional.of(logFileReader);
+      } catch (InvalidLogTrailerException invalidLogTrailerException) {
+        // If trailer is missing or corrupt, create reader without trailer 
validation
+        LOG.warn("Invalid Trailer for file {}", filePath, 
invalidLogTrailerException);
+        logFileReaderContext.setValidateTrailer(false);
+        logFileReader.init(logFileReaderContext);
+        return Optional.of(logFileReader);
+      } catch (IOException exception) {
+        LOG.error("Failed to initialize new LogFileReader for path {}", 
filePath, exception);
+        throw exception;
+      }
+    }
+  }
+
+  protected void recoverLease(final FileSystem fs, final Path filePath) throws 
IOException {
+    RecoverLeaseFSUtils.recoverFileLease(fs, filePath, conf);
+  }
+
+  protected boolean isFileClosed(final FileSystem fs, final Path filePath) 
throws IOException {
+    boolean isClosed;
+    try {
+      isClosed = ((LeaseRecoverable) fs).isFileClosed(filePath);
+    } catch (ClassCastException classCastException) {
+      // If filesystem is not of type LeaseRecoverable, assume file is always 
closed
+      isClosed = true;
     }
-    return logFileReader;
+    return isClosed;
   }
 
   /**
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 82f44e28f8..03e4a62d09 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -132,6 +132,11 @@
       <artifactId>hbase-compression-aircompressor</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hbase</groupId>
+      <artifactId>hbase-asyncfs</artifactId>
+      <scope>test</scope>
+    </dependency>
 
     <!-- HBase Adjacent Dependencies -->
     <dependency>
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index 062bef68bd..22075604ee 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -29,14 +29,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.sql.Connection;
 import java.sql.DriverManager;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
+import java.util.*;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -142,10 +135,13 @@ public class ReplicationLogProcessorTestIT extends 
ParallelStatsDisabledIT {
     // Test createLogFileReader with valid file - should succeed
     ReplicationLogProcessor replicationLogProcessor =
       new ReplicationLogProcessor(conf, testHAGroupName);
-    LogFileReader reader = 
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+    Optional<LogFileReader> optionalLogFileReader =
+      replicationLogProcessor.createLogFileReader(localFs, validFilePath);
 
     // Verify reader is created successfully
-    assertNotNull("Reader should not be null for valid file", reader);
+    assertNotNull("Reader should not be null for valid file", 
optionalLogFileReader);
+    assertTrue("Reader should be present for valid file", 
optionalLogFileReader.isPresent());
+    LogFileReader reader = optionalLogFileReader.get();
     assertNotNull("Reader context should not be null", reader.getContext());
     assertEquals("File path should match", validFilePath, 
reader.getContext().getFilePath());
     assertEquals("File system should match", localFs, 
reader.getContext().getFileSystem());
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
new file mode 100644
index 0000000000..81bbbcd2bc
--- /dev/null
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static 
org.apache.phoenix.replication.reader.RecoverLeaseFSUtils.LEASE_RECOVERABLE_CLASS_NAME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+/**
+ * Test our recoverLease loop against mocked up filesystem.
+ */
+public class RecoverLeaseFSUtilsTest extends ParallelStatsDisabledIT {
+
+  @ClassRule
+  public static TemporaryFolder testFolder = new TemporaryFolder();
+
+  private static Configuration conf;
+  private static FileSystem localFs;
+  private static Path FILE;
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    conf = getUtility().getConfiguration();
+    localFs = FileSystem.getLocal(conf);
+    
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND,
 10);
+    
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND,
 10);
+    FILE = new Path(testFolder.newFile("file.txt").toURI());
+  }
+
+  @AfterClass
+  public static void cleanUp() throws IOException {
+    localFs.delete(new Path(testFolder.getRoot().toURI()), true);
+  }
+
+  /**
+   * Test recover lease eventually succeeding.
+   */
+  @Test
+  public void testRecoverLease() throws IOException {
+    long startTime = EnvironmentEdgeManager.currentTime();
+    
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
+      1000);
+    CancelableProgressable reporter = 
Mockito.mock(CancelableProgressable.class);
+    Mockito.when(reporter.progress()).thenReturn(true);
+    DistributedFileSystem dfs = Mockito.mock(DistributedFileSystem.class);
+    // Fail four times and pass on the fifth.
+    
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false)
+      .thenReturn(false).thenReturn(true);
+    RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, conf, reporter);
+    Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE);
+    // Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3 (the 
first two
+    // invocations will happen pretty fast... then we fall into the longer 
wait loop).
+    assertTrue((EnvironmentEdgeManager.currentTime() - startTime) > (3 * 
conf.getLong(
+      
RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND, 
61000)));
+  }
+
+  /**
+   * Test that we can use reflection to access LeaseRecoverable methods.
+   */
+  @Test
+  public void testLeaseRecoverable() throws IOException {
+    try {
+      // set LeaseRecoverable to FakeLeaseRecoverable for testing
+      
RecoverLeaseFSUtils.initializeRecoverLeaseMethod(FakeLeaseRecoverable.class.getName());
+      RecoverableFileSystem mockFS = Mockito.mock(RecoverableFileSystem.class);
+      Mockito.when(mockFS.recoverLease(FILE)).thenReturn(true);
+      RecoverLeaseFSUtils.recoverFileLease(mockFS, FILE, conf);
+      Mockito.verify(mockFS, Mockito.times(1)).recoverLease(FILE);
+      
assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(Mockito.mock(RecoverableFileSystem.class)));
+    } finally {
+      
RecoverLeaseFSUtils.initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
+    }
+  }
+
+  /**
+   * Test that isFileClosed makes us recover lease faster.
+   */
+  @Test
+  public void testIsFileClosed() throws IOException {
+    // Make this time long so it is plain we broke out because of the 
isFileClosed invocation.
+    
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
+      100000);
+    CancelableProgressable reporter = 
Mockito.mock(CancelableProgressable.class);
+    Mockito.when(reporter.progress()).thenReturn(true);
+    IsFileClosedDistributedFileSystem dfs = 
Mockito.mock(IsFileClosedDistributedFileSystem.class);
+    // Now make it so we fail the first two times -- the two fast invocations, 
then we fall into
+    // the long loop during which we will call isFileClosed.... the next 
invocation should
+    // therefore return true if we are to break the loop.
+    
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true);
+    Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true);
+    RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, conf, reporter);
+    Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE);
+    Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
+  }
+
+  /**
+   * Test isLeaseRecoverable for both distributed and local FS
+   */
+  @Test
+  public void testIsLeaseRecoverable() {
+    assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(new 
DistributedFileSystem()));
+    assertFalse(RecoverLeaseFSUtils.isLeaseRecoverable(new LocalFileSystem()));
+  }
+
+  private interface FakeLeaseRecoverable {
+    @SuppressWarnings("unused")
+    boolean recoverLease(Path p) throws IOException;
+
+    @SuppressWarnings("unused")
+    boolean isFileClosed(Path p) throws IOException;
+  }
+
+  private static abstract class RecoverableFileSystem extends FileSystem
+    implements FakeLeaseRecoverable {
+    @Override
+    public boolean recoverLease(Path p) throws IOException {
+      return true;
+    }
+
+    @Override
+    public boolean isFileClosed(Path p) throws IOException {
+      return true;
+    }
+  }
+
+  /**
+   * Version of DFS that has HDFS-4525 in it.
+   */
+  private static class IsFileClosedDistributedFileSystem extends 
DistributedFileSystem {
+    /**
+     * Close status of a file. Copied over from HDFS-4525
+     * @return true if file is already closed
+     **/
+    @Override
+    public boolean isFileClosed(Path f) throws IOException {
+      return false;
+    }
+  }
+}
diff --git a/phoenix-mapreduce-byo-shaded-hbase/pom.xml 
b/phoenix-mapreduce-byo-shaded-hbase/pom.xml
index 477cd06345..c0a7eab48e 100644
--- a/phoenix-mapreduce-byo-shaded-hbase/pom.xml
+++ b/phoenix-mapreduce-byo-shaded-hbase/pom.xml
@@ -370,6 +370,12 @@
                 </excludes>
               </filter>
               <!-- Phoenix specific -->
+              <filter>
+                <artifact>*:*</artifact>
+                <excludes>
+                  <exclude>csv-bulk-load-config.properties</exclude>
+                </excludes>
+              </filter>
             </filters>
             <transformers>
               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>

Reply via email to