Copilot commented on code in PR #7300:
URL: https://github.com/apache/hbase/pull/7300#discussion_r2372999441


##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/util/BackupFileSystemManager.java:
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.util;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.backup.replication.Utils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Initializes and organizes backup directories for continuous Write-Ahead 
Logs (WALs) and
+ * bulk-loaded files within the specified backup root directory.
+ */
[email protected]
+public class BackupFileSystemManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BackupFileSystemManager.class);
+
+  public static final String WALS_DIR = "WALs";
+  public static final String BULKLOAD_FILES_DIR = "bulk-load-files";
+  private final String peerId;
+  private final FileSystem backupFs;
+  private final Path backupRootDir;
+  private final Path walsDir;
+  private final Path bulkLoadFilesDir;
+
+  public BackupFileSystemManager(String peerId, Configuration conf, String 
backupRootDirStr)
+    throws IOException {
+    this.peerId = peerId;
+    this.backupRootDir = new Path(backupRootDirStr);
+    this.backupFs = FileSystem.get(backupRootDir.toUri(), conf);
+    this.walsDir = createDirectory(WALS_DIR);
+    this.bulkLoadFilesDir = createDirectory(BULKLOAD_FILES_DIR);
+  }
+
+  private Path createDirectory(String dirName) throws IOException {
+    Path dirPath = new Path(backupRootDir, dirName);
+    backupFs.mkdirs(dirPath);
+    LOG.info("{} Initialized directory: {}", Utils.logPeerId(peerId), dirPath);
+    return dirPath;
+  }
+
+  public Path getWalsDir() {
+    return walsDir;
+  }
+
+  public Path getBulkLoadFilesDir() {
+    return bulkLoadFilesDir;
+  }
+
+  public FileSystem getBackupFs() {
+    return backupFs;
+  }
+
+  public static final class WalPathInfo {
+    private final Path prefixBeforeWALs;
+    private final String dateSegment;
+
+    public WalPathInfo(Path prefixBeforeWALs, String dateSegment) {
+      this.prefixBeforeWALs = prefixBeforeWALs;
+      this.dateSegment = dateSegment;
+    }
+
+    public Path getPrefixBeforeWALs() {
+      return prefixBeforeWALs;
+    }
+
+    public String getDateSegment() {
+      return dateSegment;
+    }
+  }
+
+  /**
+   * Validate the walPath has the expected structure: 
.../WALs/<date>/<wal-file> and return
+   * WalPathInfo(prefixBeforeWALs, dateSegment).
+   * @throws IOException if the path is not in expected format
+   */
+  public static WalPathInfo extractWalPathInfo(Path walPath) throws 
IOException {
+    if (walPath == null) {
+      throw new IllegalArgumentException("walPath must not be null");
+    }
+
+    Path dateDir = walPath.getParent(); // .../WALs/<date>
+    if (dateDir == null) {
+      throw new IOException("Invalid WAL path: missing date directory. Path: " 
+ walPath);
+    }
+
+    Path walsDir = dateDir.getParent(); // .../WALs
+    if (walsDir == null) {
+      throw new IOException("Invalid WAL path: missing WALs directory. Path: " 
+ walPath);
+    }
+
+    String walsDirName = walsDir.getName();
+    if (!WALS_DIR.equals(walsDirName)) {
+      throw new IOException("Invalid WAL path: expected '" + WALS_DIR + "' 
segment but found '"
+        + walsDirName + "'. Path: " + walPath);
+    }
+
+    String dateSegment = dateDir.getName();
+    if (dateSegment == null || dateSegment.isEmpty()) {
+      throw new IOException("Invalid WAL path: date segment is empty. Path: " 
+ walPath);
+    }
+
+    Path prefixBeforeWALs = walsDir.getParent(); // might be null if path is 
like "/WALs/..."
+    return new WalPathInfo(prefixBeforeWALs, dateSegment);
+  }
+
+  /**
+   * Resolve the full bulk-load file path corresponding to a relative 
bulk-load path referenced from
+   * a WAL file path. For a WAL path like: 
/some/prefix/.../WALs/23-08-2025/some-wal-file and a
+   * relative bulk path like: namespace/table/region/family/file, this returns:
+   * 
/some/prefix/.../bulk-load-files/23-08-2025/namespace/table/region/family/file
+   * @param walPath          the Path to the WAL file (must contain the {@link 
#WALS_DIR} segment
+   *                         followed by date)
+   * @param relativeBulkPath the relative bulk-load file Path
+   * @return resolved full Path for the bulk-load file
+   * @throws IOException if the WAL path does not contain the expected segments
+   */
+  public static Path resolveBulkLoadFullPath(Path walPath, Path 
relativeBulkPath)
+    throws IOException {
+    WalPathInfo info = extractWalPathInfo(walPath);
+
+    Path prefixBeforeWALs = info.getPrefixBeforeWALs();
+    String dateSegment = info.getDateSegment();
+
+    Path full; // Build final path:
+               // 
<prefixBeforeWALs>/bulk-load-files/<dateSegment>/<relativeBulkPath>
+    if (prefixBeforeWALs == null || prefixBeforeWALs.toString().isEmpty()) {

Review Comment:
   Calling `toString()` on a potentially null `prefixBeforeWALs` will throw a 
NullPointerException. The null check should be performed first, and the empty 
string check should be separate.
   ```suggestion
       if (prefixBeforeWALs == null || (prefixBeforeWALs != null && 
prefixBeforeWALs.toString().isEmpty())) {
   ```



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/mapreduce/BulkLoadCollectorJob.java:
##########
@@ -0,0 +1,399 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup.mapreduce;
+
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.TABLES_KEY;
+import static org.apache.hadoop.hbase.mapreduce.WALPlayer.TABLE_MAP_KEY;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.util.BackupFileSystemManager;
+import org.apache.hadoop.hbase.backup.util.BulkLoadProcessor;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.WALInputFormat;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * MapReduce job that scans WAL backups and extracts referenced bulk-load 
store-file paths.
+ * <p>
+ * This job is intended to be used when you want a list of HFiles / 
store-files referenced by WAL
+ * bulk-load descriptors. It emits a de-duplicated list of full paths (one per 
line) by default
+ * using the {@link DedupReducer}.
+ * </p>
+ * <p>
+ * Usage (CLI):
+ * {@code BulkLoadCollector <WAL inputdir> <bulk-files-output-dir> [<tables> 
[<tableMappings>]]}
+ * </p>
+ */
[email protected]
+public class BulkLoadCollectorJob extends Configured implements Tool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkLoadCollectorJob.class);
+
+  public static final String NAME = "BulkLoadCollector";
+  public static final String DEFAULT_REDUCERS = "1";
+
+  public BulkLoadCollectorJob() {
+  }
+
+  protected BulkLoadCollectorJob(final Configuration c) {
+    super(c);
+  }
+
+  /**
+   * Mapper that extracts relative bulk-load paths from a WAL entry (via 
{@code BulkLoadProcessor}),
+   * resolves them to full paths (via
+   * {@code BackupFileSystemManager#resolveBulkLoadFullPath(Path, Path)}), and 
emits each full path
+   * as the map key (Text). Uses the same table-filtering semantics as other 
WAL mappers: if no
+   * tables are configured, all tables are processed; otherwise only the 
configured table set is
+   * processed. Map output: (Text fullPathString, NullWritable)
+   */
+  public static class BulkLoadCollectorMapper extends Mapper<WALKey, WALEdit, 
Text, NullWritable> {
+    private final Map<TableName, TableName> tables = new TreeMap<>();
+    private final Text out = new Text();
+
+    @Override
+    protected void map(WALKey key, WALEdit value, Context context)
+      throws IOException, InterruptedException {
+      if (key == null) {
+        if (LOG.isTraceEnabled()) LOG.trace("map: received null WALKey, 
skipping");
+        return;
+      }
+      if (value == null) {
+        if (LOG.isTraceEnabled())
+          LOG.trace("map: received null WALEdit for table={}, skipping", 
safeTable(key));
+        return;
+      }
+
+      TableName tname = key.getTableName();
+
+      // table filtering
+      if (!(tables.isEmpty() || tables.containsKey(tname))) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("map: skipping table={} because it is not in configured 
table list", tname);
+        }
+        return;
+      }
+
+      // Extract relative store-file paths referenced by this WALEdit.
+      // Delegates parsing to BulkLoadProcessor so parsing logic is 
centralized.
+      List<Path> relativePaths = BulkLoadProcessor.processBulkLoadFiles(key, 
value);
+      if (relativePaths.isEmpty()) return;
+
+      // Determine WAL input path for this split (used to compute date/prefix 
for full path)
+      Path walInputPath;
+      try {
+        walInputPath =
+          new Path(((WALInputFormat.WALSplit) 
context.getInputSplit()).getLogFileName());
+      } catch (ClassCastException cce) {
+        String splitClass =
+          (context.getInputSplit() == null) ? "null" : 
context.getInputSplit().getClass().getName();
+        LOG.warn(
+          "map: unexpected InputSplit type (not WALSplit) - cannot determine 
WAL input path; context.getInputSplit() class={}",
+          splitClass, cce);
+        throw new IOException("Unexpected InputSplit type: expected WALSplit 
but got " + splitClass,
+          cce);
+      }
+
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("map: walInputPath={} table={} relativePathsCount={}", 
walInputPath, tname,
+          relativePaths.size());
+      }
+
+      // Build full path for each relative path and emit it.
+      for (Path rel : relativePaths) {
+        Path full = 
BackupFileSystemManager.resolveBulkLoadFullPath(walInputPath, rel);
+        out.set(full.toString());
+        context.write(out, NullWritable.get());
+        context.getCounter("BulkCollector", "StoreFilesEmitted").increment(1);
+      }
+    }
+
+    @Override
+    protected void setup(Context context) throws IOException {
+      String[] tableMap = context.getConfiguration().getStrings(TABLES_KEY);
+      String[] tablesToUse = context.getConfiguration().getStrings(TABLES_KEY);
+      if (tableMap == null) {
+        tableMap = tablesToUse;
+      }

Review Comment:
   Line 154 retrieves the same configuration key as line 153, making the 
assignment redundant. Line 154 should retrieve `TABLE_MAP_KEY` instead of 
`TABLES_KEY` to get the table mappings.



##########
hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/impl/AbstractPitrRestoreHandler.java:
##########
@@ -356,7 +438,7 @@ private List<String> getValidWalDirs(Configuration conf, 
Path walBackupDir, long
           validDirs.add(dayDir.getPath().toString());
         }
       } catch (ParseException e) {
-        LOG.warn("Skipping invalid directory name: " + dirName, e);
+        LOG.warn("Skipping invalid directory name: {}", dirName, e);

Review Comment:
   [nitpick] This change from concatenated string to parameterized logging is 
good, but the comment on line 440 should be updated to reflect the corrected 
logging style for consistency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to