This is an automated email from the ASF dual-hosted git repository. tkhurana pushed a commit to branch PHOENIX-7562-feature in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this push: new 77b679ce91 Phoenix-7672 Handle Unclosed file via HDFS Lease Recovery in ReplicationLogReplay 77b679ce91 is described below commit 77b679ce91cf48abb57f034d3f5aa16f6a6c8f7f Author: Himanshu Gwalani <hgwalan...@gmail.com> AuthorDate: Fri Sep 19 02:50:27 2025 +0530 Phoenix-7672 Handle Unclosed file via HDFS Lease Recovery in ReplicationLogReplay --- phoenix-client-parent/pom.xml | 6 + phoenix-core-server/pom.xml | 8 + .../replication/reader/RecoverLeaseFSUtils.java | 333 +++++++++++++++++++++ .../reader/ReplicationLogProcessor.java | 82 ++++- phoenix-core/pom.xml | 5 + .../reader/RecoverLeaseFSUtilsTest.java | 169 +++++++++++ .../reader/ReplicationLogProcessorTest.java | 7 +- phoenix-mapreduce-byo-shaded-hbase/pom.xml | 6 + 8 files changed, 598 insertions(+), 18 deletions(-) diff --git a/phoenix-client-parent/pom.xml b/phoenix-client-parent/pom.xml index 0b3f15b8df..3cef52db14 100644 --- a/phoenix-client-parent/pom.xml +++ b/phoenix-client-parent/pom.xml @@ -112,6 +112,12 @@ <exclude>hbase-webapps/**</exclude> </excludes> </filter> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>csv-bulk-load-config.properties</exclude> + </excludes> + </filter> <!-- Phoenix specific --> </filters> <transformers> diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml index 42fb746f65..7f74c6eda1 100644 --- a/phoenix-core-server/pom.xml +++ b/phoenix-core-server/pom.xml @@ -59,6 +59,14 @@ <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-mapreduce-client-core</artifactId> </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs-client</artifactId> + </dependency> <!-- HBase dependencies --> <dependency> diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java new file mode 100644 index 0000000000..23adb8509d --- /dev/null +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FilterFileSystem; +import org.apache.hadoop.fs.LeaseRecoverable; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility methods for recovering file lease for hdfs. + */ +public class RecoverLeaseFSUtils { + + private static final Logger LOG = LoggerFactory.getLogger(RecoverLeaseFSUtils.class); + + /** + * The lease recovery timeout in milliseconds. + * + * Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS + * usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves + * beyond that limit 'to be safe'. + */ + public static final String REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND = + "phoenix.replication.replay.lease.recovery.timeout.millis"; + + public static final long DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND = 900000; + + /** + * The first pause before retrying the lease recovery. + * This setting should be a little above what the cluster dfs heartbeat is set to. + */ + public static final String REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND = + "phoenix.replication.replay.lease.recovery.first.pause.mills"; + + public static final long DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND = + 4000; + + /** + * The pause between subsequent retries of the lease recovery. + */ + public static final String REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND = + "phoenix.replication.replay.lease.recovery.pause.mills"; + + public static final long DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND = 1000; + + /** + * The dfs timeout during lease recovery in milliseconds. + * + * This should be set to how long it'll take for us to timeout against primary datanode if it + * is dead. We set it to 64 seconds, 4 seconds than the default READ_TIMEOUT in HDFS, the + * default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this + * timeout, then further recovery will take liner backoff with this base, to avoid endless + * preemptions when this value is not properly configured. + */ + public static final String REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND = + "phoenix.replication.replay.lease.recovery.dfs.timeout.mills"; + + public static final long DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND = + 64 * 1000; + + + private static Class<?> leaseRecoverableClazz = null; + private static Method recoverLeaseMethod = null; + public static final String LEASE_RECOVERABLE_CLASS_NAME = + LeaseRecoverable.class.getCanonicalName(); + // static { + // } "org.apache.hadoop.fs.LeaseRecoverable"; + static { + LOG.debug("Initialize RecoverLeaseFSUtils"); + initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME); + } + + /** + * Initialize reflection classes and methods. If LeaseRecoverable class is not found, look for + * DistributedFilSystem#recoverLease method. + */ + static void initializeRecoverLeaseMethod(String className) { + try { + leaseRecoverableClazz = Class.forName(className); + recoverLeaseMethod = leaseRecoverableClazz.getMethod("recoverLease", Path.class); + LOG.debug("set recoverLeaseMethod to " + className + ".recoverLease()"); + } catch (ClassNotFoundException e) { + LOG.debug("LeaseRecoverable interface not in the classpath, " + + "this means Hadoop 3.3.5 or below."); + try { + recoverLeaseMethod = DistributedFileSystem.class.getMethod("recoverLease", + Path.class); + } catch (NoSuchMethodException ex) { + LOG.error("Cannot find recoverLease method in DistributedFileSystem class. " + + "It should never happen. Abort.", ex); + throw new RuntimeException(ex); + } + } catch (NoSuchMethodException e) { + LOG.error("Cannot find recoverLease method in LeaseRecoverable class. " + + "It should never happen. Abort.", e); + throw new RuntimeException(e); + } + } + + private RecoverLeaseFSUtils() { + } + + public static void recoverFileLease(FileSystem fs, Path p, Configuration conf) + throws IOException { + recoverFileLease(fs, p, conf, null); + } + + /** + * Recover the lease from Hadoop file system, retrying multiple times. + */ + public static void recoverFileLease(FileSystem fs, Path p, Configuration conf, + CancelableProgressable reporter) throws IOException { + if (fs instanceof FilterFileSystem) { + fs = ((FilterFileSystem) fs).getRawFileSystem(); + } + + // lease recovery not needed for local file system case. + if (isLeaseRecoverable(fs)) { + recoverDFSFileLease(fs, p, conf, reporter); + } + } + + public static boolean isLeaseRecoverable(FileSystem fs) { + // return true if HDFS. + if (fs instanceof DistributedFileSystem) { + return true; + } + // return true if the file system implements LeaseRecoverable interface. + if (leaseRecoverableClazz != null) { + return leaseRecoverableClazz.isAssignableFrom(fs.getClass()); + } + // return false if the file system is not HDFS and does not implement LeaseRecoverable. + return false; + } + + /** + * Run the dfs recover lease. recoverLease is asynchronous. It returns: -false when it starts + * the lease recovery (i.e. lease recovery not *yet* done) - true when the lease recovery + * has succeeded or the file is closed. But, we have to be careful. Each time we call + * recoverLease, it starts the recover lease process over from the beginning. + * We could put ourselves in a situation where we are doing nothing but starting a recovery, + * interrupting it to start again, and so on. + * The findings over in HBASE-8354 have it that the namenode will try to recover the lease on + * the file's primary node. If all is well, it should return near immediately. But, as is + * common, it is the very primary node that has crashed and so the namenode will be stuck + * waiting on a socket timeout before it will ask another datanode to start the recovery. + * It does not help if we call recoverLease in the meantime and in particular, after the + * socket timeout, a recoverLease invocation will cause us to start over from square one + * (possibly waiting on socket timeout against primary node). So, in the below, we do the + * following: 1. Call recoverLease. 2. If it returns true, break. 3. If it returns false, + * wait a few seconds and then call it again. 4. If it returns true, break. 5. If it returns + * false, wait for what we think the datanode socket timeout is (configurable) and then try + * again. 6. If it returns true, break. 7. If it returns false, repeat starting at step 5. + * above. If HDFS-4525 is available, call it every second, and we might be able to exit early. + */ + private static boolean recoverDFSFileLease(final FileSystem dfs, final Path p, + final Configuration conf, + final CancelableProgressable reporter) + throws IOException { + LOG.info("Recover lease on dfs file " + p); + + long startWaiting = EnvironmentEdgeManager.currentTime(); + long recoveryTimeout = conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND, + DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND) + startWaiting; + long firstPause = conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND, + DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND); + long subsequentPauseBase = conf.getLong( + REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND, + DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND); + + Method isFileClosedMeth = null; + // whether we need to look for isFileClosed method + boolean findIsFileClosedMeth = true; + boolean recovered = false; + // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. + for (int nbAttempt = 0; !recovered; nbAttempt++) { + recovered = recoverLease(dfs, nbAttempt, p, startWaiting); + if (recovered) { + break; + } + checkIfCancelled(reporter); + if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p, startWaiting)) { + break; + } + try { + // On the first time through wait the short 'firstPause'. + if (nbAttempt == 0) { + Thread.sleep(firstPause); + } else { + // Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, + // check isFileClosed if available (should be in hadoop 2.0.5... + // not in hadoop 1 though). + long localStartWaiting = EnvironmentEdgeManager.currentTime(); + while ( + (EnvironmentEdgeManager.currentTime() - localStartWaiting) + < subsequentPauseBase * nbAttempt + ) { + Thread.sleep(conf.getLong( + REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND, + DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND)); + if (findIsFileClosedMeth) { + try { + isFileClosedMeth = dfs.getClass().getMethod("isFileClosed", + new Class[] { Path.class }); + } catch (NoSuchMethodException nsme) { + LOG.debug("isFileClosed not available"); + } finally { + findIsFileClosedMeth = false; + } + } + if (isFileClosedMeth != null && isFileClosed(dfs, isFileClosedMeth, p)) { + recovered = true; + break; + } + checkIfCancelled(reporter); + } + } + } catch (InterruptedException ie) { + InterruptedIOException iioe = new InterruptedIOException(); + iioe.initCause(ie); + throw iioe; + } + } + return recovered; + } + + private static boolean checkIfTimedout(final Configuration conf, final long recoveryTimeout, + final int nbAttempt, final Path p, + final long startWaiting) { + if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) { + LOG.warn("Cannot recoverLease after trying for " + + conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND, + DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND) + + "ms " + REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND + + " continuing, but may be DATALOSS!!!; " + + getLogMessageDetail(nbAttempt, p, startWaiting)); + return true; + } + return false; + } + + /** + * Try to recover the lease. + * @return True if dfs#recoverLease came by true. + */ + private static boolean recoverLease(final FileSystem dfs, final int nbAttempt, final Path p, + final long startWaiting) throws FileNotFoundException { + boolean recovered = false; + try { + recovered = (Boolean) recoverLeaseMethod.invoke(dfs, p); + LOG.info((recovered ? "Recovered lease, " : "Failed to recover lease, ") + + getLogMessageDetail(nbAttempt, p, startWaiting)); + } catch (InvocationTargetException ite) { + final Throwable e = ite.getCause(); + if (e instanceof LeaseExpiredException + && e.getMessage().contains("File does not exist")) { + // This exception comes out instead of FNFE, fix it + throw new FileNotFoundException("The given replication log wasn't found at " + p); + } else if (e instanceof FileNotFoundException) { + throw (FileNotFoundException) e; + } + LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e); + } catch (IllegalAccessException e) { + LOG.error("Failed to call recoverLease on {}. Abort.", dfs, e); + throw new RuntimeException(e); + } + return recovered; + } + + /** + * Returns Detail to append to any log message around lease recovering. + */ + private static String getLogMessageDetail(final int nbAttempt, final Path p, + final long startWaiting) { + return "attempt=" + nbAttempt + " on file=" + p + " after " + + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms"; + } + + /** + * Call HDFS-4525 isFileClosed if it is available. + * @return True if file is closed. + */ + private static boolean isFileClosed(final FileSystem dfs, final Method m, final Path p) { + try { + return (Boolean) m.invoke(dfs, p); + } catch (SecurityException e) { + LOG.warn("No access", e); + } catch (Exception e) { + LOG.warn("Failed invocation for " + p.toString(), e); + } + return false; + } + + private static void checkIfCancelled(final CancelableProgressable reporter) + throws InterruptedIOException { + if (reporter == null) { + return; + } + if (!reporter.progress()) { + throw new InterruptedIOException("Operation cancelled"); + } + } + +} diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java index 73dbfdbdf9..4a47779d70 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java @@ -24,6 +24,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -31,6 +32,7 @@ import java.util.concurrent.Future; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LeaseRecoverable; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.phoenix.replication.log.InvalidLogTrailerException; import org.apache.phoenix.replication.log.LogFile; import org.apache.phoenix.replication.log.LogFileReader; import org.apache.phoenix.replication.log.LogFileReaderContext; @@ -168,7 +171,7 @@ public class ReplicationLogProcessor implements Closeable { */ public static ReplicationLogProcessor get(Configuration conf, String haGroupName) { return INSTANCES.computeIfAbsent(haGroupName, - k -> new ReplicationLogProcessor(conf, haGroupName)); + k -> new ReplicationLogProcessor(conf, haGroupName)); } /** @@ -194,9 +197,9 @@ public class ReplicationLogProcessor implements Closeable { decorateConf(); this.metrics = createMetricsSource(); this.executorService = Executors.newFixedThreadPool(threadPoolSize, - new ThreadFactoryBuilder() - .setNameFormat("Phoenix-Replication-Log-Processor-" + haGroupName + "-%d") - .build()); + new ThreadFactoryBuilder() + .setNameFormat("Phoenix-Replication-Log-Processor-" + haGroupName + "-%d") + .build()); } /** @@ -233,7 +236,15 @@ public class ReplicationLogProcessor implements Closeable { try { // Create the LogFileReader for given path - logFileReader = createLogFileReader(fs, filePath); + Optional<LogFileReader> logFileReaderOptional = createLogFileReader(fs, filePath); + + if (!logFileReaderOptional.isPresent()) { + // This is an empty file, assume processed successfully and return + LOG.warn("Found empty file to process {}", filePath); + return; + } + + logFileReader = logFileReaderOptional.get(); for (LogFile.Record record : logFileReader) { final TableName tableName = TableName.valueOf(record.getHBaseTableName()); @@ -287,23 +298,60 @@ public class ReplicationLogProcessor implements Closeable { * @return A configured LogFileReader instance * @throws IOException if the file doesn't exist or initialization fails */ - protected LogFileReader createLogFileReader(FileSystem fs, Path filePath) throws IOException { + protected Optional<LogFileReader> createLogFileReader(FileSystem fs, Path filePath) + throws IOException { // Ensure that file exists. If we face exception while checking the path itself, // method would throw same exception back to the caller if (!fs.exists(filePath)) { throw new IOException("Log file does not exist: " + filePath); } LogFileReader logFileReader = new LogFileReader(); - try { - LogFileReaderContext logFileReaderContext = new LogFileReaderContext(conf) - .setFileSystem(fs).setFilePath(filePath); + LogFileReaderContext logFileReaderContext = new LogFileReaderContext(conf) + .setFileSystem(fs).setFilePath(filePath); + boolean isClosed = isFileClosed(fs, filePath); + if (isClosed) { + // As file is closed, ensure that the file has a valid header and trailer logFileReader.init(logFileReaderContext); - } catch (IOException exception) { - LOG.error("Failed to initialize new LogFileReader for path {}", - filePath, exception); - throw exception; + return Optional.of(logFileReader); + } else { + LOG.warn("Found un-closed file {}. Starting lease recovery.", filePath); + recoverLease(fs, filePath); + if (fs.getFileStatus(filePath).getLen() <= 0) { + // Found empty file, returning null LogReader + return Optional.empty(); + } + try { + // Acquired the lease, try to create reader with validation both header and trailer + logFileReader.init(logFileReaderContext); + return Optional.of(logFileReader); + } catch (InvalidLogTrailerException invalidLogTrailerException) { + // If trailer is missing or corrupt, create reader without trailer validation + LOG.warn("Invalid Trailer for file {}", + filePath, invalidLogTrailerException); + logFileReaderContext.setValidateTrailer(false); + logFileReader.init(logFileReaderContext); + return Optional.of(logFileReader); + } catch (IOException exception) { + LOG.error("Failed to initialize new LogFileReader for path {}", + filePath, exception); + throw exception; + } + } + } + + protected void recoverLease(final FileSystem fs, final Path filePath) throws IOException { + RecoverLeaseFSUtils.recoverFileLease(fs, filePath, conf); + } + + protected boolean isFileClosed(final FileSystem fs, final Path filePath) throws IOException { + boolean isClosed; + try { + isClosed = ((LeaseRecoverable) fs).isFileClosed(filePath); + } catch (ClassCastException classCastException) { + // If filesystem is not of type LeaseRecoverable, assume file is always closed + isClosed = true; } - return logFileReader; + return isClosed; } /** @@ -353,7 +401,8 @@ public class ReplicationLogProcessor implements Closeable { // Update current operations for next retry currentOperations = result.getFailedMutations(); - lastError = new IOException("Failed to apply the mutations", result.getException()); + lastError = new IOException("Failed to apply the mutations", + result.getException()); } catch (IOException e) { lastError = e; } @@ -427,7 +476,8 @@ public class ReplicationLogProcessor implements Closeable { // Add failed mutations to retry list failedOperations.put(tableName, tableMutationMap.get(tableName)); getMetrics().incrementFailedMutationsCount(tableMutationMap.get(tableName).size()); - LOG.debug("Failed to apply mutations for table {}: {}", tableName, e.getMessage()); + LOG.debug("Failed to apply mutations for table {}: {}", tableName, + e.getMessage()); lastException = e; } } diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index c049ab4331..1287d72f5f 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -244,6 +244,11 @@ <artifactId>hbase-compression-aircompressor</artifactId> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-asyncfs</artifactId> + <scope>test</scope> + </dependency> <!-- HBase Adjacent Dependencies --> <dependency> diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java new file mode 100644 index 0000000000..6df8577095 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication.reader; + +import static org.apache.phoenix.replication.reader.RecoverLeaseFSUtils.LEASE_RECOVERABLE_CLASS_NAME; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.LocalFileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.phoenix.end2end.ParallelStatsDisabledIT; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; + +/** + * Test our recoverLease loop against mocked up filesystem. + */ +public class RecoverLeaseFSUtilsTest extends ParallelStatsDisabledIT { + + @ClassRule + public static TemporaryFolder testFolder = new TemporaryFolder(); + + private static Configuration conf; + private static FileSystem localFs; + private static Path FILE; + + @BeforeClass + public static void setupBeforeClass() throws Exception { + conf = getUtility().getConfiguration(); + localFs = FileSystem.getLocal(conf); + conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND, 10); + conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND, 10); + FILE = new Path(testFolder.newFile("file.txt").toURI()); + } + + @AfterClass + public static void cleanUp() throws IOException { + localFs.delete(new Path(testFolder.getRoot().toURI()), true); + } + + /** + * Test recover lease eventually succeeding. + */ + @Test + public void testRecoverLease() throws IOException { + long startTime = EnvironmentEdgeManager.currentTime(); + conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND, 1000); + CancelableProgressable reporter = Mockito.mock(CancelableProgressable.class); + Mockito.when(reporter.progress()).thenReturn(true); + DistributedFileSystem dfs = Mockito.mock(DistributedFileSystem.class); + // Fail four times and pass on the fifth. + Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false) + .thenReturn(false).thenReturn(true); + RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, conf, reporter); + Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE); + // Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3 (the first two + // invocations will happen pretty fast... then we fall into the longer wait loop). + assertTrue((EnvironmentEdgeManager.currentTime() - startTime) + > (3 * conf.getLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND, 61000))); + } + + /** + * Test that we can use reflection to access LeaseRecoverable methods. + */ + @Test + public void testLeaseRecoverable() throws IOException { + try { + // set LeaseRecoverable to FakeLeaseRecoverable for testing + RecoverLeaseFSUtils.initializeRecoverLeaseMethod(FakeLeaseRecoverable.class.getName()); + RecoverableFileSystem mockFS = Mockito.mock(RecoverableFileSystem.class); + Mockito.when(mockFS.recoverLease(FILE)).thenReturn(true); + RecoverLeaseFSUtils.recoverFileLease(mockFS, FILE, conf); + Mockito.verify(mockFS, Mockito.times(1)).recoverLease(FILE); + assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(Mockito.mock(RecoverableFileSystem.class))); + } finally { + RecoverLeaseFSUtils.initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME); + } + } + + /** + * Test that isFileClosed makes us recover lease faster. + */ + @Test + public void testIsFileClosed() throws IOException { + // Make this time long so it is plain we broke out because of the isFileClosed invocation. + conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND, 100000); + CancelableProgressable reporter = Mockito.mock(CancelableProgressable.class); + Mockito.when(reporter.progress()).thenReturn(true); + IsFileClosedDistributedFileSystem dfs = Mockito.mock(IsFileClosedDistributedFileSystem.class); + // Now make it so we fail the first two times -- the two fast invocations, then we fall into + // the long loop during which we will call isFileClosed.... the next invocation should + // therefore return true if we are to break the loop. + Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true); + Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true); + RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, conf, reporter); + Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE); + Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE); + } + + /** + * Test isLeaseRecoverable for both distributed and local FS + */ + @Test + public void testIsLeaseRecoverable() { + assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(new DistributedFileSystem())); + assertFalse(RecoverLeaseFSUtils.isLeaseRecoverable(new LocalFileSystem())); + } + + private interface FakeLeaseRecoverable { + @SuppressWarnings("unused") + boolean recoverLease(Path p) throws IOException; + + @SuppressWarnings("unused") + boolean isFileClosed(Path p) throws IOException; + } + + private static abstract class RecoverableFileSystem extends FileSystem + implements FakeLeaseRecoverable { + @Override + public boolean recoverLease(Path p) throws IOException { + return true; + } + + @Override + public boolean isFileClosed(Path p) throws IOException { + return true; + } + } + + /** + * Version of DFS that has HDFS-4525 in it. + */ + private static class IsFileClosedDistributedFileSystem extends DistributedFileSystem { + /** + * Close status of a file. Copied over from HDFS-4525 + * @return true if file is already closed + **/ + @Override + public boolean isFileClosed(Path f) throws IOException { + return false; + } + } +} diff --git a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java index 2bbc18a35d..7d1ce01705 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java @@ -35,6 +35,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Properties; import java.util.Random; import java.util.concurrent.ExecutorService; @@ -138,10 +139,12 @@ public class ReplicationLogProcessorTest extends ParallelStatsDisabledIT { // Test createLogFileReader with valid file - should succeed ReplicationLogProcessor replicationLogProcessor = new ReplicationLogProcessor(conf, testHAGroupName); - LogFileReader reader = replicationLogProcessor.createLogFileReader(localFs, validFilePath); + Optional<LogFileReader> optionalLogFileReader = replicationLogProcessor.createLogFileReader(localFs, validFilePath); // Verify reader is created successfully - assertNotNull("Reader should not be null for valid file", reader); + assertNotNull("Reader should not be null for valid file", optionalLogFileReader); + assertTrue("Reader should be present for valid file", optionalLogFileReader.isPresent()); + LogFileReader reader = optionalLogFileReader.get(); assertNotNull("Reader context should not be null", reader.getContext()); assertEquals("File path should match", validFilePath, reader.getContext().getFilePath()); assertEquals("File system should match", localFs, reader.getContext().getFileSystem()); diff --git a/phoenix-mapreduce-byo-shaded-hbase/pom.xml b/phoenix-mapreduce-byo-shaded-hbase/pom.xml index d98b42160d..077d91691c 100644 --- a/phoenix-mapreduce-byo-shaded-hbase/pom.xml +++ b/phoenix-mapreduce-byo-shaded-hbase/pom.xml @@ -120,6 +120,12 @@ </excludes> </filter> <!-- Phoenix specific --> + <filter> + <artifact>*:*</artifact> + <excludes> + <exclude>csv-bulk-load-config.properties</exclude> + </excludes> + </filter> </filters> <transformers> <transformer