Apache9 commented on a change in pull request #941: HBASE-23326 Implement a 
ProcedureStore which stores procedures in a H…
URL: https://github.com/apache/hbase/pull/941#discussion_r358028295
 
 

 ##########
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/procedure2/store/region/RegionProcedureStore.java
 ##########
 @@ -0,0 +1,391 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.procedure2.store.region;
+
+import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureStoreBase;
+import org.apache.hadoop.hbase.procedure2.store.ProcedureTree;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
+import org.apache.hadoop.hbase.regionserver.RegionScanner;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.CommonFSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ * A procedure store which uses a region to store all the procedures.
+ * <p/>
+ * FileSystem layout:
+ * 
+ * <pre>
+ * hbase
+ *   |
+ *   --MasterProcs
+ *       |
+ *       --data
+ *       |  |
+ *       |  --/hbase/procedure/encodedRegionName <---- The region data
+ *       |      |
+ *       |      --replay <---- The edits to replay
+ *       |
+ *       --WALs
+ *          |
+ *          --host,port,ts <---- The WAL dir for active master 
+ *          |
+ *          --host,port,ts-dead <---- The WAL dir dead master
+ * </pre>
+ */
[email protected]
+public class RegionProcedureStore extends ProcedureStoreBase {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RegionProcedureStore.class);
+
+  private static final String MASTER_PROCEDURE_DIR = "MasterProcs";
+
+  private static final String DATA_DIR = "data";
+
+  private static final String REPLAY_EDITS_DIR = "replay";
+
+  private static final String DEAD_WAL_DIR_SUFFIX = "-dead";
+
+  private static final TableName TABLE_NAME = 
TableName.valueOf("hbase:procedure");
+
+  private static final byte[] FAMILY = Bytes.toBytes("info");
+
+  private static final byte[] PROC_QUALIFIER = Bytes.toBytes("proc");
+
+  private static final TableDescriptor TABLE_DESC = 
TableDescriptorBuilder.newBuilder(TABLE_NAME)
+    .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
+
+  private final Configuration conf;
+
+  private final Server server;
+
+  private WALFactory walFactory;
+
+  private HRegion region;
+
+  private RegionFlusherAndCompactor flusherAndCompactor;
+
+  private RegionProcedureStoreWALRoller walRoller;
+
+  private int numThreads;
+
+  public RegionProcedureStore(Configuration conf, Server server) {
+    this.conf = new Configuration(conf);
+    this.server = server;
+  }
+
+  @Override
+  public void start(int numThreads) throws IOException {
+    this.numThreads = numThreads;
+  }
+
+  @Override
+  public void stop(boolean abort) {
+    if (flusherAndCompactor != null) {
+      flusherAndCompactor.close();
+    }
+    if (region != null) {
+      try {
+        region.close();
+      } catch (IOException e) {
+        LOG.warn("Failed to close region", e);
+      }
+    }
+    if (walFactory != null) {
+      try {
+        walFactory.shutdown();
+      } catch (IOException e) {
+        LOG.warn("Failed to shutdown WAL", e);
+      }
+    }
+    if (walRoller != null) {
+      walRoller.close();
+    }
+  }
+
+  @Override
+  public int getNumThreads() {
+    return numThreads;
+  }
+
+  @Override
+  public int setRunningProcedureCount(int count) {
+    // useless for region based storage.
+    return count;
+  }
+
+  private WAL createWAL(RegionInfo regionInfo) throws IOException {
+    WAL wal = walFactory.getWAL(regionInfo);
+    walRoller.addWAL(wal);
+    return wal;
+  }
+
+  private HRegion bootstrap(FileSystem fs, Path dataDir) throws IOException {
+    RegionInfo regionInfo = RegionInfoBuilder.newBuilder(TABLE_NAME).build();
+    Path tmpDataDir = new Path(dataDir.getParent(), dataDir.getName() + 
"-tmp");
+    if (fs.exists(tmpDataDir) && !fs.delete(tmpDataDir, true)) {
+      throw new IOException("Can not delete partial created proc region " + 
tmpDataDir);
+    }
+    Path tableDir = CommonFSUtils.getTableDir(tmpDataDir, TABLE_NAME);
+    HRegion.createHRegion(conf, regionInfo, fs, tableDir, TABLE_DESC).close();
+    if (!fs.rename(tmpDataDir, dataDir)) {
+      throw new IOException("Can not rename " + tmpDataDir + " to " + dataDir);
+    }
+    WAL wal = createWAL(regionInfo);
+    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, 
TABLE_DESC, wal, null,
+      null);
+  }
+
+  private HRegion open(FileSystem fs, Path rootDir, Path dataDir) throws 
IOException {
+    String factoryId = server.getServerName().toString();
+    Path tableDir = CommonFSUtils.getTableDir(dataDir, TABLE_NAME);
+    Path regionDir =
+      fs.listStatus(tableDir, p -> 
RegionInfo.isEncodedRegionName(Bytes.toBytes(p.getName())))[0]
+        .getPath();
+    Path replayEditsDir = new Path(regionDir, REPLAY_EDITS_DIR);
+    Path walsDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
+    for (FileStatus walDir : fs.listStatus(walsDir)) {
+      if (!walDir.isDirectory()) {
+        continue;
+      }
+      if (walDir.getPath().getName().startsWith(factoryId)) {
+        // it's ourselves, continue
+        continue;
+      }
+      Path deadWALDir;
+      if (!walDir.getPath().getName().endsWith(DEAD_WAL_DIR_SUFFIX)) {
+        deadWALDir =
+          new Path(walDir.getPath().getParent(), walDir.getPath().getName() + 
DEAD_WAL_DIR_SUFFIX);
+        if (!fs.rename(walDir.getPath(), deadWALDir)) {
+          throw new IOException("Can not rename " + walDir + " to " + 
deadWALDir +
+            " when recovering lease of proc store");
+        }
+        LOG.info("Renamed {} to {} as it is dead", walDir.getPath(), 
deadWALDir);
+      } else {
+        deadWALDir = walDir.getPath();
+        LOG.info("{} is already marked as dead", deadWALDir);
+      }
+      for (FileStatus walFile : fs.listStatus(deadWALDir)) {
+        Path replayEditsFile = new Path(replayEditsDir, 
walFile.getPath().getName());
+        if (!fs.rename(walFile.getPath(), replayEditsFile)) {
+          throw new IOException("Can not rename " + walDir + " to " + 
deadWALDir +
+            " when recovering lease of proc store");
+        }
+        LOG.info("Renamed {} to {}", walFile.getPath(), replayEditsFile);
+      }
+      LOG.info("Delete empty proc wal dir {}", deadWALDir);
+      fs.delete(deadWALDir, true);
+    }
+    RegionInfo regionInfo = HRegionFileSystem.loadRegionInfoFileContent(fs, 
regionDir);
+    WAL wal = createWAL(regionInfo);
+    conf.set(HRegion.RECOVERED_EDITS_DIR,
+      replayEditsDir.makeQualified(fs.getUri(), 
fs.getWorkingDirectory()).toString());
+    return HRegion.openHRegionFromTableDir(conf, fs, tableDir, regionInfo, 
TABLE_DESC, wal, null,
+      null);
+  }
+
+  @Override
+  public void recoverLease() throws IOException {
+    FileSystem fs = CommonFSUtils.getWALFileSystem(conf);
+    Path rootDir = new Path(CommonFSUtils.getWALRootDir(conf), 
MASTER_PROCEDURE_DIR);
+    Path dataDir = new Path(rootDir, DATA_DIR);
+    CommonFSUtils.setWALRootDir(conf, rootDir);
+    walRoller = RegionProcedureStoreWALRoller.create(conf, server);
+
+    walFactory = new WALFactory(conf, server.getServerName().toString(), 
false);
+    if (fs.exists(dataDir)) {
+      // load the existing region.
+      region = open(fs, rootDir, dataDir);
+    } else {
+      // bootstrapping...
+      region = bootstrap(fs, dataDir);
+    }
+    flusherAndCompactor = new RegionFlusherAndCompactor(conf, server, region);
+    walRoller.setFlusherAndCompactor(flusherAndCompactor);
+    // TODO: migrate old procedure data into the region.
+  }
+
+  @Override
+  public void load(ProcedureLoader loader) throws IOException {
+    List<ProcedureProtos.Procedure> procs = new ArrayList<>();
+    long maxProcId = 0;
+    try (RegionScanner scanner = region.getScanner(new Scan())) {
+      List<Cell> cells = new ArrayList<>();
+      boolean moreRows;
+      do {
+        moreRows = scanner.next(cells);
+        for (Cell cell : cells) {
+          ProcedureProtos.Procedure proto = ProcedureProtos.Procedure.parser()
+            .parseFrom(cell.getValueArray(), cell.getValueOffset(), 
cell.getValueLength());
+          procs.add(proto);
+          maxProcId = Math.max(maxProcId, proto.getProcId());
+        }
+      } while (moreRows);
+    }
+    loader.setMaxProcId(maxProcId);
+    ProcedureTree tree = ProcedureTree.build(procs);
+    loader.load(tree.getValidProcs());
+    loader.handleCorrupted(tree.getCorruptedProcs());
+  }
+
+  private void serializePut(Procedure<?> proc, List<Mutation> mutations, 
List<byte[]> rowsToLock)
+    throws IOException {
+    ProcedureProtos.Procedure proto = 
ProcedureUtil.convertToProtoProcedure(proc);
+    byte[] row = Bytes.toBytes(proc.getProcId());
+    mutations.add(new Put(row).addColumn(FAMILY, PROC_QUALIFIER, 
proto.toByteArray()));
+    rowsToLock.add(row);
+  }
+
+  private void serializeDelete(long procId, List<Mutation> mutations, 
List<byte[]> rowsToLock) {
+    byte[] row = Bytes.toBytes(procId);
+    mutations.add(new Delete(row));
+    rowsToLock.add(row);
+  }
+
+  @Override
+  public void insert(Procedure<?> proc, Procedure<?>[] subProcs) {
+    if (subProcs == null || subProcs.length == 0) {
+      // same with update, just insert a single procedure
+      update(proc);
+      return;
+    }
+    List<Mutation> mutations = new ArrayList<>(subProcs.length + 1);
+    List<byte[]> rowsToLock = new ArrayList<>(subProcs.length + 1);
+    try {
+      serializePut(proc, mutations, rowsToLock);
+      for (Procedure<?> subProc : subProcs) {
+        serializePut(subProc, mutations, rowsToLock);
+      }
+      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to insert proc {}, sub procs {}", 
proc,
+        Arrays.toString(subProcs), e);
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public void insert(Procedure<?>[] procs) {
+    List<Mutation> mutations = new ArrayList<>(procs.length);
+    List<byte[]> rowsToLock = new ArrayList<>(procs.length);
+    try {
+      for (Procedure<?> proc : procs) {
+        serializePut(proc, mutations, rowsToLock);
+      }
+      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to insert procs {}", 
Arrays.toString(procs), e);
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public void update(Procedure<?> proc) {
+    try {
+      ProcedureProtos.Procedure proto = 
ProcedureUtil.convertToProtoProcedure(proc);
+      region.put(new Put(Bytes.toBytes(proc.getProcId())).addColumn(FAMILY, 
PROC_QUALIFIER,
+        proto.toByteArray()));
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to update proc {}", proc, e);
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public void delete(long procId) {
+    try {
+      region.delete(new Delete(Bytes.toBytes(procId)));
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to delete proc id={}", procId, e);
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public void delete(Procedure<?> parentProc, long[] subProcIds) {
+    List<Mutation> mutations = new ArrayList<>(subProcIds.length + 1);
+    List<byte[]> rowsToLock = new ArrayList<>(subProcIds.length + 1);
+    try {
+      serializePut(parentProc, mutations, rowsToLock);
+      for (long subProcId : subProcIds) {
+        serializeDelete(subProcId, mutations, rowsToLock);
+      }
+      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to delete parent proc {}, sub proc 
ids={}", parentProc,
+        Arrays.toString(subProcIds), e);
+      throw new UncheckedIOException(e);
+    }
+  }
+
+  @Override
+  public void delete(long[] procIds, int offset, int count) {
+    if (count == 0) {
+      return;
+    }
+    if (count == 1) {
+      delete(procIds[offset]);
+      return;
+    }
+    List<Mutation> mutations = new ArrayList<>(count);
+    List<byte[]> rowsToLock = new ArrayList<>(count);
+    for (int i = 0; i < count; i++) {
+      serializeDelete(procIds[offset + i], mutations, rowsToLock);
+    }
+    try {
+      region.mutateRowsWithLocks(mutations, rowsToLock, NO_NONCE, NO_NONCE);
+    } catch (IOException e) {
+      LOG.error(HBaseMarkers.FATAL, "Failed to delete proc ids={}", 
Arrays.toString(procIds), e);
+      throw new UncheckedIOException(e);
+    }
+  }
+}
 
 Review comment:
   Yes, much less code than before, the only overhead is that we need to 
implement the flush and compact schedule by ourselves.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to