This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 6a5f9ca5b68 HBASE-28483 backup merge fails on bulkloaded hfiles (#5795)
6a5f9ca5b68 is described below

commit 6a5f9ca5b681f1e9684d76187daac05f3bc0cf76
Author: Thomas Sarens <thom...@ngdata.com>
AuthorDate: Sat Apr 6 19:22:17 2024 +0200

    HBASE-28483 backup merge fails on bulkloaded hfiles (#5795)
    
    Signed-off-by: Bryan Beaudreault <bbeaudrea...@apache.org>
---
 .../TestIncrementalBackupMergeWithBulkLoad.java    | 250 +++++++++++++++++++++
 .../hadoop/hbase/mapreduce/HFileInputFormat.java   |  25 ++-
 2 files changed, 266 insertions(+), 9 deletions(-)

diff --git 
a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithBulkLoad.java
 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithBulkLoad.java
new file mode 100644
index 00000000000..c383a8545a8
--- /dev/null
+++ 
b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestIncrementalBackupMergeWithBulkLoad.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.backup;
+
+import static org.apache.hadoop.hbase.backup.BackupInfo.BackupState.COMPLETE;
+import static org.apache.hadoop.hbase.backup.BackupTestUtil.enableBackup;
+import static org.apache.hadoop.hbase.backup.BackupTestUtil.verifyBackup;
+import static org.apache.hadoop.hbase.backup.BackupType.FULL;
+import static org.apache.hadoop.hbase.backup.BackupType.INCREMENTAL;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseCommonTestingUtility;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.backup.impl.BackupAdminImpl;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testing.TestingHBaseCluster;
+import org.apache.hadoop.hbase.testing.TestingHBaseClusterOption;
+import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category(MediumTests.class)
+@RunWith(Parameterized.class)
+public class TestIncrementalBackupMergeWithBulkLoad {
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestIncrementalBackupMergeWithBulkLoad.class);
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestIncrementalBackupMergeWithBulkLoad.class);
+
+  @Parameterized.Parameters(name = "{index}: useBulkLoad={0}")
+  public static Iterable<Object[]> data() {
+    return HBaseCommonTestingUtility.BOOLEAN_PARAMETERIZED;
+  }
+
+  @Parameterized.Parameter(0)
+  public boolean useBulkLoad;
+
+  private TableName sourceTable;
+  private TableName targetTable;
+
+  private List<TableName> allTables;
+  private static TestingHBaseCluster cluster;
+  private static final Path BACKUP_ROOT_DIR = new Path("backupIT");
+  private static final byte[] COLUMN_FAMILY = Bytes.toBytes("0");
+
+  @BeforeClass
+  public static void beforeClass() throws Exception {
+    Configuration conf = HBaseConfiguration.create();
+    enableBackup(conf);
+    cluster = 
TestingHBaseCluster.create(TestingHBaseClusterOption.builder().conf(conf).build());
+    cluster.start();
+  }
+
+  @AfterClass
+  public static void afterClass() throws Exception {
+    cluster.stop();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    sourceTable = TableName.valueOf("table-" + useBulkLoad);
+    targetTable = TableName.valueOf("another-table-" + useBulkLoad);
+    allTables = Arrays.asList(sourceTable, targetTable);
+    createTable(sourceTable);
+    createTable(targetTable);
+  }
+
+  @Test
+  public void testMergeContainingBulkloadedHfiles() throws Exception {
+    Instant timestamp = Instant.now();
+
+    String backupId = backup(FULL, allTables);
+    BackupInfo backupInfo = verifyBackup(cluster.getConf(), backupId, FULL, 
COMPLETE);
+    assertTrue(backupInfo.getTables().contains(sourceTable));
+
+    // load some data
+    load(sourceTable, timestamp, "data");
+
+    String backupId1 = backup(INCREMENTAL, allTables);
+    backupInfo = verifyBackup(cluster.getConf(), backupId1, INCREMENTAL, 
COMPLETE);
+    assertTrue(backupInfo.getTables().contains(sourceTable));
+
+    String backupId2 = backup(INCREMENTAL, allTables);
+    backupInfo = verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, 
COMPLETE);
+    assertTrue(backupInfo.getTables().contains(sourceTable));
+
+    merge(new String[] { backupId1, backupId2 });
+    backupInfo = verifyBackup(cluster.getConf(), backupId2, INCREMENTAL, 
COMPLETE);
+    assertTrue(backupInfo.getTables().contains(sourceTable));
+    validateDataEquals(sourceTable, "data");
+  }
+
+  private void createTable(TableName tableName) throws IOException {
+    TableDescriptorBuilder builder = 
TableDescriptorBuilder.newBuilder(tableName)
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(COLUMN_FAMILY));
+    try (Connection connection = 
ConnectionFactory.createConnection(cluster.getConf());
+      Admin admin = connection.getAdmin()) {
+      admin.createTable(builder.build());
+    }
+  }
+
+  private void load(TableName tableName, Instant timestamp, String data) 
throws IOException {
+    if (useBulkLoad) {
+      hFileBulkLoad(tableName, timestamp, data);
+    } else {
+      putLoad(tableName, timestamp, data);
+    }
+  }
+
+  private void putLoad(TableName tableName, Instant timestamp, String data) 
throws IOException {
+    LOG.info("Writing new data to HBase using normal Puts: {}", data);
+    try (Connection connection = 
ConnectionFactory.createConnection(cluster.getConf())) {
+      Table table = connection.getTable(sourceTable);
+      List<Put> puts = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        Put put = new Put(Bytes.toBytes(i), timestamp.toEpochMilli());
+        put.addColumn(COLUMN_FAMILY, Bytes.toBytes("data"), 
Bytes.toBytes(data));
+        puts.add(put);
+
+        if (i % 100 == 0) {
+          table.put(puts);
+          puts.clear();
+        }
+      }
+      if (!puts.isEmpty()) {
+        table.put(puts);
+      }
+      connection.getAdmin().flush(tableName);
+    }
+  }
+
+  private void hFileBulkLoad(TableName tableName, Instant timestamp, String 
data)
+    throws IOException {
+    FileSystem fs = FileSystem.get(cluster.getConf());
+    LOG.info("Writing new data to HBase using BulkLoad: {}", data);
+    // HFiles require this strict directory structure to allow to load them
+    Path hFileRootPath = new Path("/tmp/hfiles_" + UUID.randomUUID());
+    fs.mkdirs(hFileRootPath);
+    Path hFileFamilyPath = new Path(hFileRootPath, 
Bytes.toString(COLUMN_FAMILY));
+    fs.mkdirs(hFileFamilyPath);
+    try (HFile.Writer writer = HFile.getWriterFactoryNoCache(cluster.getConf())
+      .withPath(fs, new Path(hFileFamilyPath, "hfile_" + UUID.randomUUID()))
+      .withFileContext(new 
HFileContextBuilder().withTableName(tableName.toBytes())
+        .withColumnFamily(COLUMN_FAMILY).build())
+      .create()) {
+      for (int i = 0; i < 10; i++) {
+        writer.append(new KeyValue(Bytes.toBytes(i), COLUMN_FAMILY, 
Bytes.toBytes("data"),
+          timestamp.toEpochMilli(), Bytes.toBytes(data)));
+      }
+    }
+    Map<BulkLoadHFiles.LoadQueueItem, ByteBuffer> result =
+      BulkLoadHFiles.create(cluster.getConf()).bulkLoad(tableName, 
hFileRootPath);
+    assertFalse(result.isEmpty());
+  }
+
+  private String backup(BackupType backupType, List<TableName> tables) throws 
IOException {
+    LOG.info("Creating the backup ...");
+
+    try (Connection connection = 
ConnectionFactory.createConnection(cluster.getConf());
+      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+      BackupRequest backupRequest =
+        new 
BackupRequest.Builder().withTargetRootDir(BACKUP_ROOT_DIR.toString())
+          .withTableList(new 
ArrayList<>(tables)).withBackupType(backupType).build();
+      return backupAdmin.backupTables(backupRequest);
+    }
+  }
+
+  private void merge(String[] backupIds) throws IOException {
+    LOG.info("Merging the backups ...");
+
+    try (Connection connection = 
ConnectionFactory.createConnection(cluster.getConf());
+      BackupAdmin backupAdmin = new BackupAdminImpl(connection)) {
+      backupAdmin.mergeBackups(backupIds);
+    }
+  }
+
+  private void validateDataEquals(TableName tableName, String expectedData) 
throws IOException {
+    try (Connection connection = 
ConnectionFactory.createConnection(cluster.getConf());
+      Table table = connection.getTable(tableName)) {
+      Scan scan = new Scan();
+      scan.readAllVersions();
+      scan.setRaw(true);
+      scan.setBatch(100);
+
+      for (Result sourceResult : table.getScanner(scan)) {
+        List<Cell> sourceCells = sourceResult.listCells();
+        for (Cell cell : sourceCells) {
+          assertEquals(expectedData, Bytes.toStringBinary(cell.getValueArray(),
+            cell.getValueOffset(), cell.getValueLength()));
+        }
+      }
+    }
+  }
+
+}
diff --git 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
index 1fdcf4bcfd4..1bbbe513f73 100644
--- 
a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
+++ 
b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/HFileInputFormat.java
@@ -19,7 +19,6 @@ package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.OptionalLong;
 import org.apache.hadoop.conf.Configuration;
@@ -152,19 +151,27 @@ public class HFileInputFormat extends 
FileInputFormat<NullWritable, Cell> {
     List<FileStatus> result = new ArrayList<FileStatus>();
 
     // Explode out directories that match the original FileInputFormat filters
-    // since HFiles are written to directories where the
-    // directory name is the column name
+    // Typically these are <regionname>-level dirs, only requiring 1 level of 
recursion to
+    // get the <columnname>-level dirs where the HFile are written, but in 
some cases
+    // <tablename>-level dirs are provided requiring 2 levels of recursion.
     for (FileStatus status : super.listStatus(job)) {
-      if (status.isDirectory()) {
-        FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
-        Collections.addAll(result, fs.listStatus(status.getPath(), 
HIDDEN_FILE_FILTER));
-      } else {
-        result.add(status);
-      }
+      addFilesRecursively(job, status, result);
     }
     return result;
   }
 
+  private static void addFilesRecursively(JobContext job, FileStatus status,
+    List<FileStatus> result) throws IOException {
+    if (status.isDirectory()) {
+      FileSystem fs = status.getPath().getFileSystem(job.getConfiguration());
+      for (FileStatus fileStatus : fs.listStatus(status.getPath(), 
HIDDEN_FILE_FILTER)) {
+        addFilesRecursively(job, fileStatus, result);
+      }
+    } else {
+      result.add(status);
+    }
+  }
+
   @Override
   public RecordReader<NullWritable, Cell> createRecordReader(InputSplit split,
     TaskAttemptContext context) throws IOException, InterruptedException {

Reply via email to