vinayakphegde commented on code in PR #6523:
URL: https://github.com/apache/hbase/pull/6523#discussion_r1875449325


##########
hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java:
##########
@@ -156,7 +164,7 @@ protected static enum Counter {
    * A mapper that writes out {@link Mutation} to be directly applied to a 
running HBase instance.
    */
   protected static class WALMapper
-    extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Mutation> {
+    extends Mapper<WALKey, WALEdit, ImmutableBytesWritable, Pair<Mutation, 
List<String>>> {

Review Comment:
   Instead of Pair here, can we use a Custom Class? So that The exclusivity 
between Mutation and BulkLoadFiles is enforced programmatically.



##########
hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java:
##########
@@ -172,6 +180,52 @@ public void map(WALKey key, WALEdit value, Context 
context) throws IOException {
           ExtendedCell lastCell = null;
           for (ExtendedCell cell : 
WALEditInternalHelper.getExtendedCells(value)) {
             context.getCounter(Counter.CELLS_READ).increment(1);
+
+            if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+              String namespace = key.getTableName().getNamespaceAsString();
+              String tableName = key.getTableName().getQualifierAsString();
+              LOG.info("Processing bulk load for namespace: {}, table: {}", 
namespace, tableName);
+
+              List<String> bulkloadFiles = handleBulkLoadCell(cell);
+              LOG.info("Found {} bulk load files for table: {}", 
bulkloadFiles.size(), tableName);
+
+              // Prefix each file path with namespace and table name to 
construct the full paths
+              List<String> bulkloadFilesWithFullPath = bulkloadFiles.stream()
+                .map(filePath -> new Path(namespace, new Path(tableName, 
filePath)).toString())
+                .collect(Collectors.toList());
+              LOG.info("Bulk load files with full paths: {}", 
bulkloadFilesWithFullPath.size());
+
+              // Retrieve configuration and set up file systems for backup and 
staging locations
+              Configuration conf = context.getConfiguration();
+              Path backupLocation = new 
Path(conf.get(BULKLOAD_BACKUP_LOCATION));
+              FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); // 
HDFS filesystem
+              Path hbaseStagingDir = new Path(CommonFSUtils.getRootDir(conf), 
HConstants.BULKLOAD_STAGING_DIR_NAME);
+              FileSystem backupFs = FileSystem.get(backupLocation.toUri(), 
conf);
+
+              List<String> stagingPaths = new ArrayList<>();
+
+              try {
+                for (String file : bulkloadFilesWithFullPath) {
+                  // Full file path from S3

Review Comment:
   remove the hardcoded S3 here



##########
hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java:
##########
@@ -172,6 +180,52 @@ public void map(WALKey key, WALEdit value, Context 
context) throws IOException {
           ExtendedCell lastCell = null;
           for (ExtendedCell cell : 
WALEditInternalHelper.getExtendedCells(value)) {
             context.getCounter(Counter.CELLS_READ).increment(1);
+
+            if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+              String namespace = key.getTableName().getNamespaceAsString();
+              String tableName = key.getTableName().getQualifierAsString();
+              LOG.info("Processing bulk load for namespace: {}, table: {}", 
namespace, tableName);
+
+              List<String> bulkloadFiles = handleBulkLoadCell(cell);
+              LOG.info("Found {} bulk load files for table: {}", 
bulkloadFiles.size(), tableName);
+
+              // Prefix each file path with namespace and table name to 
construct the full paths
+              List<String> bulkloadFilesWithFullPath = bulkloadFiles.stream()
+                .map(filePath -> new Path(namespace, new Path(tableName, 
filePath)).toString())
+                .collect(Collectors.toList());
+              LOG.info("Bulk load files with full paths: {}", 
bulkloadFilesWithFullPath.size());
+
+              // Retrieve configuration and set up file systems for backup and 
staging locations
+              Configuration conf = context.getConfiguration();
+              Path backupLocation = new 
Path(conf.get(BULKLOAD_BACKUP_LOCATION));

Review Comment:
   check for if backupLocation is not specified.



##########
hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java:
##########
@@ -172,6 +180,52 @@ public void map(WALKey key, WALEdit value, Context 
context) throws IOException {
           ExtendedCell lastCell = null;
           for (ExtendedCell cell : 
WALEditInternalHelper.getExtendedCells(value)) {
             context.getCounter(Counter.CELLS_READ).increment(1);
+
+            if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {
+              String namespace = key.getTableName().getNamespaceAsString();
+              String tableName = key.getTableName().getQualifierAsString();
+              LOG.info("Processing bulk load for namespace: {}, table: {}", 
namespace, tableName);
+
+              List<String> bulkloadFiles = handleBulkLoadCell(cell);
+              LOG.info("Found {} bulk load files for table: {}", 
bulkloadFiles.size(), tableName);
+
+              // Prefix each file path with namespace and table name to 
construct the full paths
+              List<String> bulkloadFilesWithFullPath = bulkloadFiles.stream()
+                .map(filePath -> new Path(namespace, new Path(tableName, 
filePath)).toString())
+                .collect(Collectors.toList());
+              LOG.info("Bulk load files with full paths: {}", 
bulkloadFilesWithFullPath.size());
+
+              // Retrieve configuration and set up file systems for backup and 
staging locations
+              Configuration conf = context.getConfiguration();
+              Path backupLocation = new 
Path(conf.get(BULKLOAD_BACKUP_LOCATION));
+              FileSystem rootFs = CommonFSUtils.getRootDirFileSystem(conf); // 
HDFS filesystem
+              Path hbaseStagingDir = new Path(CommonFSUtils.getRootDir(conf), 
HConstants.BULKLOAD_STAGING_DIR_NAME);
+              FileSystem backupFs = FileSystem.get(backupLocation.toUri(), 
conf);
+
+              List<String> stagingPaths = new ArrayList<>();
+
+              try {
+                for (String file : bulkloadFilesWithFullPath) {

Review Comment:
   these are not full paths, but the relative paths from namespace



##########
hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java:
##########
@@ -172,6 +180,52 @@ public void map(WALKey key, WALEdit value, Context 
context) throws IOException {
           ExtendedCell lastCell = null;
           for (ExtendedCell cell : 
WALEditInternalHelper.getExtendedCells(value)) {
             context.getCounter(Counter.CELLS_READ).increment(1);
+
+            if (CellUtil.matchingQualifier(cell, WALEdit.BULK_LOAD)) {

Review Comment:
   I believe the processing of bulkloaded files can be simplified, and we could 
reduce the log level from INFO to DEBUG or TRACE in some cases.



##########
hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/WALPlayer.java:
##########
@@ -293,6 +398,8 @@ public Job createSubmittableJob(String[] args) throws 
IOException {
     setupTime(conf, WALInputFormat.START_TIME_KEY);
     setupTime(conf, WALInputFormat.END_TIME_KEY);
     String inputDirs = args[0];
+    String walDir = new Path(inputDirs, "WALs").toString();

Review Comment:
   I don't think this is correct. We are hard-coding the directories here.
   We could introduce a new optional parameter that the user can specify if 
they have bulkloaded files for us to process.
   For example:
   `hbase org.apache.hadoop.hbase.mapreduce.WALPlayer /backuplogdir 
oldTable1,oldTable2 newTable1,newTable2 
-Dwal.bulk.backup.location=/bulkload-files-dir`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to