This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature-new
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature-new by
this push:
new aad16f60c7 Phoenix-7672 Handle Unclosed file via HDFS Lease Recovery
in ReplicationLogReplay (#2345)
aad16f60c7 is described below
commit aad16f60c795eda58a9d352c1e38452570638585
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Sat Jan 10 22:03:59 2026 +0530
Phoenix-7672 Handle Unclosed file via HDFS Lease Recovery in
ReplicationLogReplay (#2345)
---
phoenix-client-parent/pom.xml | 6 +
phoenix-core-server/pom.xml | 8 +
.../replication/reader/RecoverLeaseFSUtils.java | 320 +++++++++++++++++++++
.../reader/ReplicationLogProcessor.java | 65 ++++-
phoenix-core/pom.xml | 5 +
.../reader/ReplicationLogProcessorTestIT.java | 16 +-
.../reader/RecoverLeaseFSUtilsTest.java | 170 +++++++++++
phoenix-mapreduce-byo-shaded-hbase/pom.xml | 6 +
8 files changed, 577 insertions(+), 19 deletions(-)
diff --git a/phoenix-client-parent/pom.xml b/phoenix-client-parent/pom.xml
index 47b62405c3..02ff3d6272 100644
--- a/phoenix-client-parent/pom.xml
+++ b/phoenix-client-parent/pom.xml
@@ -110,6 +110,12 @@
<exclude>hbase-webapps/**</exclude>
</excludes>
</filter>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>csv-bulk-load-config.properties</exclude>
+ </excludes>
+ </filter>
<!-- Phoenix specific -->
</filters>
<transformers>
diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml
index ef5140feb9..914a7078ee 100644
--- a/phoenix-core-server/pom.xml
+++ b/phoenix-core-server/pom.xml
@@ -57,6 +57,14 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ </dependency>
<!-- HBase dependencies -->
<dependency>
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
new file mode 100644
index 0000000000..a4ad4aa061
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.LeaseRecoverable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods for recovering file lease for hdfs.
+ */
+public class RecoverLeaseFSUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RecoverLeaseFSUtils.class);
+
+ /**
+ * The lease recovery timeout in milliseconds. Default is 15 minutes. It's
huge, but the idea is
+ * that if we have a major issue, HDFS usually needs 10 minutes before
marking the nodes as dead.
+ * So we're putting ourselves beyond that limit 'to be safe'.
+ */
+ public static final String
REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND =
+ "phoenix.replication.replay.lease.recovery.timeout.millis";
+
+ public static final long
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND = 900000;
+
+ /**
+ * The first pause before retrying the lease recovery. This setting should
be a little above what
+ * the cluster dfs heartbeat is set to.
+ */
+ public static final String
REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND =
+ "phoenix.replication.replay.lease.recovery.first.pause.mills";
+
+ public static final long
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND = 4000;
+
+ /**
+ * The pause between subsequent retries of the lease recovery.
+ */
+ public static final String
REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND =
+ "phoenix.replication.replay.lease.recovery.pause.mills";
+
+ public static final long
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND = 1000;
+
+ /**
+ * The dfs timeout during lease recovery in milliseconds. This should be set
to how long it'll
+ * take for us to timeout against primary datanode if it is dead. We set it
to 64 seconds, 4
+ * seconds than the default READ_TIMEOUT in HDFS, the default value for
+ * DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this
timeout, then further
+ * recovery will take liner backoff with this base, to avoid endless
preemptions when this value
+ * is not properly configured.
+ */
+ public static final String
REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND =
+ "phoenix.replication.replay.lease.recovery.dfs.timeout.mills";
+
+ public static final long
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND =
+ 64 * 1000;
+
+ private static Class<?> leaseRecoverableClazz = null;
+ private static Method recoverLeaseMethod = null;
+ public static final String LEASE_RECOVERABLE_CLASS_NAME =
+ LeaseRecoverable.class.getCanonicalName();
+ // static {
+ // } "org.apache.hadoop.fs.LeaseRecoverable";
+ static {
+ LOG.debug("Initialize RecoverLeaseFSUtils");
+ initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
+ }
+
+ /**
+ * Initialize reflection classes and methods. If LeaseRecoverable class is
not found, look for
+ * DistributedFilSystem#recoverLease method.
+ */
+ static void initializeRecoverLeaseMethod(String className) {
+ try {
+ leaseRecoverableClazz = Class.forName(className);
+ recoverLeaseMethod = leaseRecoverableClazz.getMethod("recoverLease",
Path.class);
+ LOG.debug("set recoverLeaseMethod to " + className + ".recoverLease()");
+ } catch (ClassNotFoundException e) {
+ LOG.debug(
+ "LeaseRecoverable interface not in the classpath, " + "this means
Hadoop 3.3.5 or below.");
+ try {
+ recoverLeaseMethod =
DistributedFileSystem.class.getMethod("recoverLease", Path.class);
+ } catch (NoSuchMethodException ex) {
+ LOG.error("Cannot find recoverLease method in DistributedFileSystem
class. "
+ + "It should never happen. Abort.", ex);
+ throw new RuntimeException(ex);
+ }
+ } catch (NoSuchMethodException e) {
+ LOG.error("Cannot find recoverLease method in LeaseRecoverable class. "
+ + "It should never happen. Abort.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private RecoverLeaseFSUtils() {
+ }
+
+ public static void recoverFileLease(FileSystem fs, Path p, Configuration
conf)
+ throws IOException {
+ recoverFileLease(fs, p, conf, null);
+ }
+
+ /**
+ * Recover the lease from Hadoop file system, retrying multiple times.
+ */
+ public static void recoverFileLease(FileSystem fs, Path p, Configuration
conf,
+ CancelableProgressable reporter) throws IOException {
+ if (fs instanceof FilterFileSystem) {
+ fs = ((FilterFileSystem) fs).getRawFileSystem();
+ }
+
+ // lease recovery not needed for local file system case.
+ if (isLeaseRecoverable(fs)) {
+ recoverDFSFileLease(fs, p, conf, reporter);
+ }
+ }
+
+ public static boolean isLeaseRecoverable(FileSystem fs) {
+ // return true if HDFS.
+ if (fs instanceof DistributedFileSystem) {
+ return true;
+ }
+ // return true if the file system implements LeaseRecoverable interface.
+ if (leaseRecoverableClazz != null) {
+ return leaseRecoverableClazz.isAssignableFrom(fs.getClass());
+ }
+ // return false if the file system is not HDFS and does not implement
LeaseRecoverable.
+ return false;
+ }
+
+ /**
+ * Run the dfs recover lease. recoverLease is asynchronous. It returns:
-false when it starts the
+ * lease recovery (i.e. lease recovery not *yet* done) - true when the lease
recovery has
+ * succeeded or the file is closed. But, we have to be careful. Each time we
call recoverLease, it
+ * starts the recover lease process over from the beginning. We could put
ourselves in a situation
+ * where we are doing nothing but starting a recovery, interrupting it to
start again, and so on.
+ * The findings over in HBASE-8354 have it that the namenode will try to
recover the lease on the
+ * file's primary node. If all is well, it should return near immediately.
But, as is common, it
+ * is the very primary node that has crashed and so the namenode will be
stuck waiting on a socket
+ * timeout before it will ask another datanode to start the recovery. It
does not help if we call
+ * recoverLease in the meantime and in particular, after the socket timeout,
a recoverLease
+ * invocation will cause us to start over from square one (possibly waiting
on socket timeout
+ * against primary node). So, in the below, we do the following: 1. Call
recoverLease. 2. If it
+ * returns true, break. 3. If it returns false, wait a few seconds and then
call it again. 4. If
+ * it returns true, break. 5. If it returns false, wait for what we think
the datanode socket
+ * timeout is (configurable) and then try again. 6. If it returns true,
break. 7. If it returns
+ * false, repeat starting at step 5. above. If HDFS-4525 is available, call
it every second, and
+ * we might be able to exit early.
+ */
+ private static boolean recoverDFSFileLease(final FileSystem dfs, final Path
p,
+ final Configuration conf, final CancelableProgressable reporter) throws
IOException {
+ LOG.info("Recover lease on dfs file " + p);
+
+ long startWaiting = EnvironmentEdgeManager.currentTime();
+ long recoveryTimeout =
conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND,
+ DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND) +
startWaiting;
+ long firstPause =
conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND,
+ DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND);
+ long subsequentPauseBase =
+ conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
+ DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND);
+
+ Method isFileClosedMeth = null;
+ // whether we need to look for isFileClosed method
+ boolean findIsFileClosedMeth = true;
+ boolean recovered = false;
+ // We break the loop if we succeed the lease recovery, timeout, or we
throw an exception.
+ for (int nbAttempt = 0; !recovered; nbAttempt++) {
+ recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
+ if (recovered) {
+ break;
+ }
+ checkIfCancelled(reporter);
+ if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) {
+ break;
+ }
+ try {
+ // On the first time through wait the short 'firstPause'.
+ if (nbAttempt == 0) {
+ Thread.sleep(firstPause);
+ } else {
+ // Cycle here until (subsequentPause * nbAttempt) elapses. While
spinning,
+ // check isFileClosed if available (should be in hadoop 2.0.5...
+ // not in hadoop 1 though).
+ long localStartWaiting = EnvironmentEdgeManager.currentTime();
+ while (
+ (EnvironmentEdgeManager.currentTime() - localStartWaiting)
+ < subsequentPauseBase * nbAttempt
+ ) {
+
Thread.sleep(conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND,
+ DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND));
+ if (findIsFileClosedMeth) {
+ try {
+ isFileClosedMeth =
+ dfs.getClass().getMethod("isFileClosed", new Class[] {
Path.class });
+ } catch (NoSuchMethodException nsme) {
+ LOG.debug("isFileClosed not available");
+ } finally {
+ findIsFileClosedMeth = false;
+ }
+ }
+ if (isFileClosedMeth != null && isFileClosed(dfs,
isFileClosedMeth, p)) {
+ recovered = true;
+ break;
+ }
+ checkIfCancelled(reporter);
+ }
+ }
+ } catch (InterruptedException ie) {
+ InterruptedIOException iioe = new InterruptedIOException();
+ iioe.initCause(ie);
+ throw iioe;
+ }
+ }
+ return recovered;
+ }
+
+ private static boolean checkIfTimedout(final Configuration conf, final long
recoveryTimeout,
+ final int nbAttempt, final Path p, final long startWaiting) {
+ if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) {
+ LOG.warn("Cannot recoverLease after trying for "
+ + conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND,
+ DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND)
+ + "ms " + REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND
+ + " continuing, but may be DATALOSS!!!; "
+ + getLogMessageDetail(nbAttempt, p, startWaiting));
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Try to recover the lease.
+ * @return True if dfs#recoverLease came by true.
+ */
+ private static boolean recoverLease(final FileSystem dfs, final int
nbAttempt, final Path p,
+ final long startWaiting) throws FileNotFoundException {
+ boolean recovered = false;
+ try {
+ recovered = (Boolean) recoverLeaseMethod.invoke(dfs, p);
+ LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ")
+ + getLogMessageDetail(nbAttempt, p, startWaiting));
+ } catch (InvocationTargetException ite) {
+ final Throwable e = ite.getCause();
+ if (e instanceof LeaseExpiredException && e.getMessage().contains("File
does not exist")) {
+ // This exception comes out instead of FNFE, fix it
+ throw new FileNotFoundException("The given replication log wasn't
found at " + p);
+ } else if (e instanceof FileNotFoundException) {
+ throw (FileNotFoundException) e;
+ }
+ LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Failed to call recoverLease on {}. Abort.", dfs, e);
+ throw new RuntimeException(e);
+ }
+ return recovered;
+ }
+
+ /**
+ * Returns Detail to append to any log message around lease recovering.
+ */
+ private static String getLogMessageDetail(final int nbAttempt, final Path p,
+ final long startWaiting) {
+ return "attempt=" + nbAttempt + " on file=" + p + " after "
+ + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms";
+ }
+
+ /**
+ * Call HDFS-4525 isFileClosed if it is available.
+ * @return True if file is closed.
+ */
+ private static boolean isFileClosed(final FileSystem dfs, final Method m,
final Path p) {
+ try {
+ return (Boolean) m.invoke(dfs, p);
+ } catch (SecurityException e) {
+ LOG.warn("No access", e);
+ } catch (Exception e) {
+ LOG.warn("Failed invocation for " + p.toString(), e);
+ }
+ return false;
+ }
+
+ private static void checkIfCancelled(final CancelableProgressable reporter)
+ throws InterruptedIOException {
+ if (reporter == null) {
+ return;
+ }
+ if (!reporter.progress()) {
+ throw new InterruptedIOException("Operation cancelled");
+ }
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index 7215ebe20e..be744a5f7e 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -24,12 +24,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LeaseRecoverable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -39,6 +41,7 @@ import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.phoenix.replication.log.InvalidLogTrailerException;
import org.apache.phoenix.replication.log.LogFile;
import org.apache.phoenix.replication.log.LogFileReader;
import org.apache.phoenix.replication.log.LogFileReaderContext;
@@ -229,7 +232,15 @@ public class ReplicationLogProcessor implements Closeable {
try {
// Create the LogFileReader for given path
- logFileReader = createLogFileReader(fs, filePath);
+ Optional<LogFileReader> logFileReaderOptional = createLogFileReader(fs,
filePath);
+
+ if (!logFileReaderOptional.isPresent()) {
+ // This is an empty file, assume processed successfully and return
+ LOG.warn("Found empty file to process {}", filePath);
+ return;
+ }
+
+ logFileReader = logFileReaderOptional.get();
for (LogFile.Record record : logFileReader) {
final TableName tableName =
TableName.valueOf(record.getHBaseTableName());
@@ -280,22 +291,58 @@ public class ReplicationLogProcessor implements Closeable
{
* @return A configured LogFileReader instance
* @throws IOException if the file doesn't exist or initialization fails
*/
- protected LogFileReader createLogFileReader(FileSystem fs, Path filePath)
throws IOException {
+ protected Optional<LogFileReader> createLogFileReader(FileSystem fs, Path
filePath)
+ throws IOException {
// Ensure that file exists. If we face exception while checking the path
itself,
// method would throw same exception back to the caller
if (!fs.exists(filePath)) {
throw new IOException("Log file does not exist: " + filePath);
}
LogFileReader logFileReader = new LogFileReader();
- try {
- LogFileReaderContext logFileReaderContext =
- new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath);
+ LogFileReaderContext logFileReaderContext =
+ new LogFileReaderContext(conf).setFileSystem(fs).setFilePath(filePath);
+ boolean isClosed = isFileClosed(fs, filePath);
+ if (isClosed) {
+ // As file is closed, ensure that the file has a valid header and trailer
logFileReader.init(logFileReaderContext);
- } catch (IOException exception) {
- LOG.error("Failed to initialize new LogFileReader for path {}",
filePath, exception);
- throw exception;
+ return Optional.of(logFileReader);
+ } else {
+ LOG.warn("Found un-closed file {}. Starting lease recovery.", filePath);
+ recoverLease(fs, filePath);
+ if (fs.getFileStatus(filePath).getLen() <= 0) {
+ // Found empty file, returning null LogReader
+ return Optional.empty();
+ }
+ try {
+ // Acquired the lease, try to create reader with validation both
header and trailer
+ logFileReader.init(logFileReaderContext);
+ return Optional.of(logFileReader);
+ } catch (InvalidLogTrailerException invalidLogTrailerException) {
+ // If trailer is missing or corrupt, create reader without trailer
validation
+ LOG.warn("Invalid Trailer for file {}", filePath,
invalidLogTrailerException);
+ logFileReaderContext.setValidateTrailer(false);
+ logFileReader.init(logFileReaderContext);
+ return Optional.of(logFileReader);
+ } catch (IOException exception) {
+ LOG.error("Failed to initialize new LogFileReader for path {}",
filePath, exception);
+ throw exception;
+ }
+ }
+ }
+
+ protected void recoverLease(final FileSystem fs, final Path filePath) throws
IOException {
+ RecoverLeaseFSUtils.recoverFileLease(fs, filePath, conf);
+ }
+
+ protected boolean isFileClosed(final FileSystem fs, final Path filePath)
throws IOException {
+ boolean isClosed;
+ try {
+ isClosed = ((LeaseRecoverable) fs).isFileClosed(filePath);
+ } catch (ClassCastException classCastException) {
+ // If filesystem is not of type LeaseRecoverable, assume file is always
closed
+ isClosed = true;
}
- return logFileReader;
+ return isClosed;
}
/**
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index 82f44e28f8..03e4a62d09 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -132,6 +132,11 @@
<artifactId>hbase-compression-aircompressor</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-asyncfs</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- HBase Adjacent Dependencies -->
<dependency>
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
index 062bef68bd..22075604ee 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTestIT.java
@@ -29,14 +29,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Random;
+import java.util.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
@@ -142,10 +135,13 @@ public class ReplicationLogProcessorTestIT extends
ParallelStatsDisabledIT {
// Test createLogFileReader with valid file - should succeed
ReplicationLogProcessor replicationLogProcessor =
new ReplicationLogProcessor(conf, testHAGroupName);
- LogFileReader reader =
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+ Optional<LogFileReader> optionalLogFileReader =
+ replicationLogProcessor.createLogFileReader(localFs, validFilePath);
// Verify reader is created successfully
- assertNotNull("Reader should not be null for valid file", reader);
+ assertNotNull("Reader should not be null for valid file",
optionalLogFileReader);
+ assertTrue("Reader should be present for valid file",
optionalLogFileReader.isPresent());
+ LogFileReader reader = optionalLogFileReader.get();
assertNotNull("Reader context should not be null", reader.getContext());
assertEquals("File path should match", validFilePath,
reader.getContext().getFilePath());
assertEquals("File system should match", localFs,
reader.getContext().getFileSystem());
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
new file mode 100644
index 0000000000..81bbbcd2bc
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static
org.apache.phoenix.replication.reader.RecoverLeaseFSUtils.LEASE_RECOVERABLE_CLASS_NAME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+/**
+ * Test our recoverLease loop against mocked up filesystem.
+ */
+public class RecoverLeaseFSUtilsTest extends ParallelStatsDisabledIT {
+
+ @ClassRule
+ public static TemporaryFolder testFolder = new TemporaryFolder();
+
+ private static Configuration conf;
+ private static FileSystem localFs;
+ private static Path FILE;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ conf = getUtility().getConfiguration();
+ localFs = FileSystem.getLocal(conf);
+
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND,
10);
+
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND,
10);
+ FILE = new Path(testFolder.newFile("file.txt").toURI());
+ }
+
+ @AfterClass
+ public static void cleanUp() throws IOException {
+ localFs.delete(new Path(testFolder.getRoot().toURI()), true);
+ }
+
+ /**
+ * Test recover lease eventually succeeding.
+ */
+ @Test
+ public void testRecoverLease() throws IOException {
+ long startTime = EnvironmentEdgeManager.currentTime();
+
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
+ 1000);
+ CancelableProgressable reporter =
Mockito.mock(CancelableProgressable.class);
+ Mockito.when(reporter.progress()).thenReturn(true);
+ DistributedFileSystem dfs = Mockito.mock(DistributedFileSystem.class);
+ // Fail four times and pass on the fifth.
+
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false)
+ .thenReturn(false).thenReturn(true);
+ RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, conf, reporter);
+ Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE);
+ // Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3 (the
first two
+ // invocations will happen pretty fast... then we fall into the longer
wait loop).
+ assertTrue((EnvironmentEdgeManager.currentTime() - startTime) > (3 *
conf.getLong(
+
RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
61000)));
+ }
+
+ /**
+ * Test that we can use reflection to access LeaseRecoverable methods.
+ */
+ @Test
+ public void testLeaseRecoverable() throws IOException {
+ try {
+ // set LeaseRecoverable to FakeLeaseRecoverable for testing
+
RecoverLeaseFSUtils.initializeRecoverLeaseMethod(FakeLeaseRecoverable.class.getName());
+ RecoverableFileSystem mockFS = Mockito.mock(RecoverableFileSystem.class);
+ Mockito.when(mockFS.recoverLease(FILE)).thenReturn(true);
+ RecoverLeaseFSUtils.recoverFileLease(mockFS, FILE, conf);
+ Mockito.verify(mockFS, Mockito.times(1)).recoverLease(FILE);
+
assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(Mockito.mock(RecoverableFileSystem.class)));
+ } finally {
+
RecoverLeaseFSUtils.initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
+ }
+ }
+
+ /**
+ * Test that isFileClosed makes us recover lease faster.
+ */
+ @Test
+ public void testIsFileClosed() throws IOException {
+ // Make this time long so it is plain we broke out because of the
isFileClosed invocation.
+
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
+ 100000);
+ CancelableProgressable reporter =
Mockito.mock(CancelableProgressable.class);
+ Mockito.when(reporter.progress()).thenReturn(true);
+ IsFileClosedDistributedFileSystem dfs =
Mockito.mock(IsFileClosedDistributedFileSystem.class);
+ // Now make it so we fail the first two times -- the two fast invocations,
then we fall into
+ // the long loop during which we will call isFileClosed.... the next
invocation should
+ // therefore return true if we are to break the loop.
+
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true);
+ Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true);
+ RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, conf, reporter);
+ Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE);
+ Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
+ }
+
+ /**
+ * Test isLeaseRecoverable for both distributed and local FS
+ */
+ @Test
+ public void testIsLeaseRecoverable() {
+ assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(new
DistributedFileSystem()));
+ assertFalse(RecoverLeaseFSUtils.isLeaseRecoverable(new LocalFileSystem()));
+ }
+
+ private interface FakeLeaseRecoverable {
+ @SuppressWarnings("unused")
+ boolean recoverLease(Path p) throws IOException;
+
+ @SuppressWarnings("unused")
+ boolean isFileClosed(Path p) throws IOException;
+ }
+
+ private static abstract class RecoverableFileSystem extends FileSystem
+ implements FakeLeaseRecoverable {
+ @Override
+ public boolean recoverLease(Path p) throws IOException {
+ return true;
+ }
+
+ @Override
+ public boolean isFileClosed(Path p) throws IOException {
+ return true;
+ }
+ }
+
+ /**
+ * Version of DFS that has HDFS-4525 in it.
+ */
+ private static class IsFileClosedDistributedFileSystem extends
DistributedFileSystem {
+ /**
+ * Close status of a file. Copied over from HDFS-4525
+ * @return true if file is already closed
+ **/
+ @Override
+ public boolean isFileClosed(Path f) throws IOException {
+ return false;
+ }
+ }
+}
diff --git a/phoenix-mapreduce-byo-shaded-hbase/pom.xml
b/phoenix-mapreduce-byo-shaded-hbase/pom.xml
index 477cd06345..c0a7eab48e 100644
--- a/phoenix-mapreduce-byo-shaded-hbase/pom.xml
+++ b/phoenix-mapreduce-byo-shaded-hbase/pom.xml
@@ -370,6 +370,12 @@
</excludes>
</filter>
<!-- Phoenix specific -->
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>csv-bulk-load-config.properties</exclude>
+ </excludes>
+ </filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>