tkhurana commented on code in PR #2278: URL: https://github.com/apache/phoenix/pull/2278#discussion_r2449879481
########## phoenix-core-server/src/main/java/org/apache/phoenix/replication/ReplicationLogTracker.java: ########## @@ -0,0 +1,518 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.replication; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.stream.Collectors; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.phoenix.replication.metrics.MetricsReplicationLogTracker; +import org.apache.phoenix.util.EnvironmentEdgeManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is responsible to track and managing replication log files across different states. + * Handles file lifecycle management including new files, in-progress files, and completed files. + */ +public class ReplicationLogTracker { + + private static final Logger LOG = LoggerFactory.getLogger(ReplicationLogTracker.class); + + /** + * Configuration key for number of retries when deleting files + */ + private static final String FILE_DELETE_RETRIES_KEY = "phoenix.replication.file.delete.retries"; + + /** + * Default number of retries for file deletion operations + */ + private static final int DEFAULT_FILE_DELETE_RETRIES = 3; + + /** + * Configuration key for delay between file deletion retry attempts + */ + private static final String FILE_DELETE_RETRY_DELAY_MS_KEY = + "phoenix.replication.file.delete.retry.delay.ms"; + + /** + * Default delay in milliseconds between file deletion retry attempts + */ + private static final long DEFAULT_FILE_DELETE_RETRY_DELAY_MS = 1000L; + + private final URI rootURI; + private final DirectoryType directoryType; + private final FileSystem fileSystem; + private Path inProgressDirPath; + private ReplicationShardDirectoryManager replicationShardDirectoryManager; + protected final Configuration conf; + protected final String haGroupName; + protected MetricsReplicationLogTracker metrics; + + public ReplicationLogTracker(final Configuration conf, final String haGroupName, + final FileSystem fileSystem, final URI rootURI, final DirectoryType directoryType, + final MetricsReplicationLogTracker metrics) { + this.conf = conf; + this.fileSystem = fileSystem; + this.haGroupName = haGroupName; + this.rootURI = rootURI; + this.directoryType = directoryType; + this.metrics = metrics; + } + + protected String getNewLogSubDirectoryName() { + return this.directoryType.getName(); + } + + protected MetricsReplicationLogTracker getMetricsSource() { + return this.metrics; + } + + protected String getInProgressLogSubDirectoryName() { + return getNewLogSubDirectoryName() + "_progress"; + } + + /** + * Initializes the file tracker by setting up new files directory and in-progress directory. + * Creates the in-progress directory (rootURI/<group-name>/[in/out]_progress) if it doesn't + * exist. + */ + public void init() throws IOException { + Path newFilesDirectory = new Path(new Path(rootURI.getPath(), haGroupName), + getNewLogSubDirectoryName()); + this.replicationShardDirectoryManager = new ReplicationShardDirectoryManager(conf, + newFilesDirectory); + this.inProgressDirPath = new Path(new Path(rootURI.getPath(), haGroupName), + getInProgressLogSubDirectoryName()); + createDirectoryIfNotExists(inProgressDirPath); + } + + public void close() { + if (this.metrics != null) { + this.metrics.close(); + } + } + + /** + * Retrieves new replication log files that belong to a specific replication round. It skips + * the invalid files (if any) + * @param replicationRound - The replication round for which to retrieve files + * @return List of valid log file paths that belong to the specified replication round + * @throws IOException if there's an error accessing the file system + */ + protected List<Path> getNewFilesForRound(ReplicationRound replicationRound) throws IOException { + Path roundDirectory = replicationShardDirectoryManager.getShardDirectory(replicationRound); + LOG.info("Getting new files for round {} from shard {}", replicationRound, roundDirectory); + if (!fileSystem.exists(roundDirectory)) { + return Collections.emptyList(); + } + + // List the files in roundDirectory + FileStatus[] fileStatuses = fileSystem.listStatus(roundDirectory); + LOG.info("Number of new files found {}", fileStatuses.length); + List<Path> filesInRound = new ArrayList<>(); + + // Filter the files belonging to current round + for (FileStatus status : fileStatuses) { + if (status.isFile()) { + if (!isValidLogFile(status.getPath())) { + LOG.warn("Invalid log file found at {}", status.getPath()); + continue; // Skip invalid files + } + try { + long fileTimestamp = getFileTimestamp(status.getPath()); + if (fileTimestamp >= replicationRound.getStartTime() + && fileTimestamp <= replicationRound.getEndTime()) { + filesInRound.add(status.getPath()); + } + } catch (NumberFormatException exception) { + // Should we throw an exception here instead? + LOG.warn("Failed to extract timestamp from {}. Ignoring the file.", + status.getPath()); + } + } + } + return filesInRound; + } + + /** + * Retrieves all valid log files currently in the in-progress directory. + * @return List of valid log file paths in the in-progress directory, empty list if directory + * doesn't exist + * @throws IOException if there's an error accessing the file system + */ + public List<Path> getInProgressFiles() throws IOException { + if (!fileSystem.exists(getInProgressDirPath())) { + return Collections.emptyList(); + } + + FileStatus[] fileStatuses = fileSystem.listStatus(getInProgressDirPath()); + List<Path> inProgressFiles = new ArrayList<>(); + + for (FileStatus status : fileStatuses) { + if (status.isFile() && isValidLogFile(status.getPath())) { + inProgressFiles.add(status.getPath()); + } + } + + return inProgressFiles; + } + + /** + * Retrieves all valid log files in the in-progress directory that are older than the + * specified timestamp. + * @param timestampThreshold - The timestamp threshold in milliseconds. Files with timestamps + * less than this value will be returned. + * @return List of valid log file paths in the in-progress directory that are older than the + * threshold, empty list if directory doesn't exist or no files match + * @throws IOException if there's an error accessing the file system + */ + public List<Path> getOlderInProgressFiles(long timestampThreshold) throws IOException { + if (!fileSystem.exists(getInProgressDirPath())) { + return Collections.emptyList(); + } + + FileStatus[] fileStatuses = fileSystem.listStatus(getInProgressDirPath()); + List<Path> olderInProgressFiles = new ArrayList<>(); + + for (FileStatus status : fileStatuses) { + if (status.isFile() && isValidLogFile(status.getPath())) { + try { + long fileTimestamp = getFileTimestamp(status.getPath()); + if (fileTimestamp < timestampThreshold) { + olderInProgressFiles.add(status.getPath()); + } + } catch (NumberFormatException e) { + LOG.warn("Failed to extract timestamp from file {}, skipping", + status.getPath().getName()); + } + } + } + + LOG.debug("Found {} in-progress files older than timestamp {}", + olderInProgressFiles.size(), timestampThreshold); + return olderInProgressFiles; + } + + /** + * Retrieves all valid log files from all shard directories. + * @return List of all valid log file paths from all shard directories + * @throws IOException if there's an error accessing the file system + */ + public List<Path> getNewFiles() throws IOException { + List<Path> shardPaths = replicationShardDirectoryManager.getAllShardPaths(); + List<Path> newFiles = new ArrayList<>(); + for (Path shardPath : shardPaths) { + if (fileSystem.exists(shardPath)) { + FileStatus[] fileStatuses = fileSystem.listStatus(shardPath); + for (FileStatus fileStatus : fileStatuses) { + if (fileStatus.isFile() && isValidLogFile(fileStatus.getPath())) { + newFiles.add(fileStatus.getPath()); + } + } + } + } + return newFiles; + } + + /** + * Marks a file as completed by deleting it from the file system. Uses retry logic with + * configurable retry count and delay. During retry attempts, it fetches the file with same + * prefix (instead of re-using the same file) because it would likely be re-named by some + * other process + * @param file - The file path to mark as completed + * @return true if file was successfully deleted, false otherwise + */ + protected boolean markCompleted(final Path file) { + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + // Increment the metrics count + getMetrics().incrementMarkFileCompletedRequestCount(); + + int maxRetries = conf.getInt(FILE_DELETE_RETRIES_KEY, DEFAULT_FILE_DELETE_RETRIES); + long retryDelayMs = conf.getLong(FILE_DELETE_RETRY_DELAY_MS_KEY, + DEFAULT_FILE_DELETE_RETRY_DELAY_MS); + + Path fileToDelete = file; + final String filePrefix = getFilePrefix(fileToDelete); + + for (int attempt = 0; attempt <= maxRetries; attempt++) { + try { + if (fileSystem.delete(fileToDelete, false)) { + LOG.info("Successfully deleted completed file: {}", fileToDelete); + long endTime = EnvironmentEdgeManager.currentTimeMillis(); + getMetrics().updateMarkFileCompletedTime(endTime - startTime); + return true; + } else { + LOG.warn("Failed to delete file (attempt {}): {}", attempt + 1, fileToDelete); + } + } catch (IOException e) { + LOG.warn("IOException while deleting file (attempt {}): {}", attempt + 1, + fileToDelete, e); + } + + // Increment the deletion failure count + getMetrics().incrementMarkFileCompletedRequestFailedCount(); + + // If deletion failed and it's not the last attempt, sleep first then try to find + // matching in-progress file + if (attempt < maxRetries) { + // Sleep before next retry + try { + Thread.sleep(retryDelayMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting to retry file deletion: {}", file); + return false; + } + + try { + // List in-progress files and find matching file with same + // <timestamp>_<region-server> prefix + List<Path> inProgressFiles = getInProgressFiles(); + List<Path> matchingFiles = inProgressFiles.stream() + .filter(path -> getFilePrefix(path).equals(filePrefix)) + .collect(Collectors.toList()); + // Assert only single file exists with that prefix + if (matchingFiles.size() == 1) { + Path matchingFile = matchingFiles.get(0); + LOG.info("Found matching in-progress file: {} for original file: {}", + matchingFile, file); + // Update fileToDelete to the matching file for subsequent retries + fileToDelete = matchingFile; + } else if (matchingFiles.size() > 1) { + LOG.warn("Multiple matching in-progress files found for prefix {}: {}", + filePrefix, matchingFiles.size()); + long endTime = EnvironmentEdgeManager.currentTimeMillis(); + getMetrics().updateMarkFileCompletedTime(endTime - startTime); + return false; + } else { + LOG.warn("No matching in-progress file found for prefix: {}. File must " + + "have " + "been deleted by some other process.", filePrefix); + long endTime = EnvironmentEdgeManager.currentTimeMillis(); + getMetrics().updateMarkFileCompletedTime(endTime - startTime); + return true; + } + } catch (IOException e) { + LOG.warn("IOException while searching for matching in-progress file " + + "(attempt {}): " + "{}", attempt + 1, file, e); + } + } + } + + long endTime = EnvironmentEdgeManager.currentTimeMillis(); + getMetrics().updateMarkFileCompletedTime(endTime - startTime); + + LOG.error("Failed to delete file after {} attempts: {}", maxRetries + 1, fileToDelete); + return false; + } + + /** + * Marks a file as in-progress by renaming it with a UUID and moving to in-progress directory. + * If file is already in in-progress directory, only updates the UUID. + * @param file - The file path to mark as in progress + * @return Optional value of renamed path if file rename was successful, else Optional.empty() + */ + protected Optional<Path> markInProgress(final Path file) { + long startTime = EnvironmentEdgeManager.currentTimeMillis(); + try { + + final String fileName = file.getName(); + final String newFileName; + final Path targetDirectory; + + // Check if file is already in in-progress directory + if (file.getParent().toUri().getPath().equals(getInProgressDirPath().toString())) { + // File is already in in-progress directory, replace UUID with a new one + // keep the directory same as in progress + String[] parts = fileName.split("_"); + // Remove the last part (UUID) and add new UUID + StringBuilder newNameBuilder = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + if (i > 0) { + newNameBuilder.append("_"); + } + newNameBuilder.append(parts[i]); + } + String extension = fileName.substring(fileName.lastIndexOf(".")); + newNameBuilder.append("_").append(UUID.randomUUID().toString()).append(extension); + newFileName = newNameBuilder.toString(); + targetDirectory = file.getParent(); + } else { + // File is not in in-progress directory, add UUID and move to IN_PROGRESS directory + String baseName = fileName.substring(0, fileName.lastIndexOf(".")); + String extension = fileName.substring(fileName.lastIndexOf(".")); + newFileName = baseName + "_" + UUID.randomUUID().toString() + extension; + targetDirectory = getInProgressDirPath(); + } + + Path newPath = new Path(targetDirectory, newFileName); + if (fileSystem.rename(file, newPath)) { + LOG.debug("Successfully marked file as in progress: {} -> {}", file.getName(), + newFileName); + return Optional.of(newPath); + } else { + LOG.warn("Failed to rename file for in-progress marking: {}", file); + return Optional.empty(); + } + } catch (IOException e) { + LOG.error("IOException while marking file as in progress: {}", file, e); + return Optional.empty(); + } finally { + // Update the metrics + getMetrics().incrementMarkFileInProgressRequestCount(); + long endTime = EnvironmentEdgeManager.currentTimeMillis(); + getMetrics().updateMarkFileInProgressTime(endTime - startTime); + } + } + + /** + * Validates if a file is a valid log file by checking if it ends with ".plog" extension. + * @param file - The file path to validate. + * @return true if file format is correct, else false + */ + protected boolean isValidLogFile(Path file) { + final String fileName = file.getName(); + return fileName.endsWith(".plog"); + } + + /** + * Extracts the timestamp from a log file name. + * Assumes timestamp is the first part of the filename separated by underscore. + * @param file - The file path to extract timestamp from. + * return the timestamp from the file name + */ + public long getFileTimestamp(Path file) throws NumberFormatException { + String[] parts = file.getName().split("_"); + return Long.parseLong(parts[0]); + } + + /** + * Extracts the UUID from a log file name. + * Assumes UUID is the last part of the filename before the extension. + * @param file - The file path to extract UUID from. + * @return Optional of UUID if file was in progress, else Optional.empty() + */ + protected Optional<String> getFileUUID(Path file) throws NumberFormatException { + String[] parts = file.getName().split("_"); + if (parts.length < 3) { + return Optional.empty(); + } + return Optional.of(parts[parts.length - 1].split("\\.")[0]); + } + + /** + * Extracts everything except the UUID (last part) from a file path. + * For example, from "1704153600000_rs1_12345678-1234-1234-1234-123456789abc.plog" + * This method will return "1704153600000_rs1" + * @param file - The file path to extract prefix from. + */ + protected String getFilePrefix(Path file) { + String fileName = file.getName(); + String[] parts = fileName.split("_"); + if (parts.length < 3) { + return fileName.split("\\.")[0]; // Return full filename if no underscore found + } + + // Return everything except the last part (UUID) + StringBuilder prefix = new StringBuilder(); + for (int i = 0; i < parts.length - 1; i++) { + if (i > 0) { + prefix.append("_"); + } + prefix.append(parts[i]); + } + + return prefix.toString(); + } + + /** + * No op implementation for marking a file as failed as its expected to be picked by + * by other process from in-progress directory + * @param file - The file which needs to be marked as failed + * @return - true if marked as failed, false otherwise + */ + public boolean markFailed(final Path file) { + getMetrics().incrementMarkFileFailedRequestCount(); + return true; + } + + public FileSystem getFileSystem() { + return this.fileSystem; + } + + public ReplicationShardDirectoryManager getReplicationShardDirectoryManager() { + return this.replicationShardDirectoryManager; + } + + protected String getHaGroupName() { + return this.haGroupName; + } + + protected Configuration getConf() { + return this.conf; + } + + protected Path getInProgressDirPath() { + return this.inProgressDirPath; + } + + protected MetricsReplicationLogTracker getMetrics() { + return this.metrics; + } + + /** + * Creates a directory if it doesn't exist. + */ + private void createDirectoryIfNotExists(Path directoryPath) throws IOException { + if (!fileSystem.exists(directoryPath)) { + LOG.info("Creating directory {}", directoryPath); + if (!fileSystem.mkdirs(directoryPath)) { + throw new IOException("Failed to create directory: " + directoryPath); + } + } + } + + /** + * Enum representing the type of replication log directory. + * IN: Directory created on standby cluster for Incoming replication log files + * OUT: Directory created on primary cluster for Outgoing replication log files + */ + public enum DirectoryType { + IN("in"), + OUT("out"); + + private final String name; + + DirectoryType(final String name) { + this.name = name; + } + + public String getName() { + return this.name; + } + } Review Comment: I think we should get rid of this enum. It is not really an enum. What if we use a different terminology. These are just string constants but they don't need to be defined here but at a higher level. The ReplicationLogTracker doesn't need to know about it. It should take a path as input and just work with it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
