mymeiyi commented on a change in pull request #163: HBASE-21995 Add a 
coprocessor to set HDFS ACL for hbase granted user
URL: https://github.com/apache/hbase/pull/163#discussion_r290258244
 
 

 ##########
 File path: 
hbase-server/src/main/java/org/apache/hadoop/hbase/security/access/SnapshotScannerHDFSAclHelper.java
 ##########
 @@ -0,0 +1,745 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.security.access;
+
+import static org.apache.hadoop.fs.permission.AclEntryScope.ACCESS;
+import static org.apache.hadoop.fs.permission.AclEntryScope.DEFAULT;
+import static org.apache.hadoop.fs.permission.AclEntryType.GROUP;
+import static org.apache.hadoop.fs.permission.AclEntryType.USER;
+import static org.apache.hadoop.fs.permission.FsAction.READ_EXECUTE;
+import static org.apache.hadoop.hbase.security.access.Permission.Action.READ;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclEntryScope;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.AuthUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.SnapshotDescription;
+import org.apache.hadoop.hbase.mob.MobUtils;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * A helper to set HBase granted user access acl and default acl over hFiles.
+ */
[email protected]
+public class SnapshotScannerHDFSAclHelper implements Closeable {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SnapshotScannerHDFSAclHelper.class);
+
+  public static final String USER_SCAN_SNAPSHOT_ENABLE = 
"hbase.user.scan.snapshot.enable";
+  public static final String USER_SCAN_SNAPSHOT_THREAD_NUMBER =
+      "hbase.user.scan.snapshot.thread.number";
+  // the tmp directory to restore snapshot, it can not be a sub directory of 
HBase root dir
+  public static final String SNAPSHOT_RESTORE_TMP_DIR = 
"hbase.snapshot.restore.tmp.dir";
+  public static final String SNAPSHOT_RESTORE_TMP_DIR_DEFAULT =
+      "/hbase/.tmpdir-to-restore-snapshot";
+  // If enable this feature, set public directories permission to 751
+  public static final String COMMON_DIRECTORY_PERMISSION =
+      "hbase.user.scan.snapshot.common.directory.permission";
+  public static final String COMMON_DIRECTORY_PERMISSION_DEFAULT = "751";
+  // If enable this feature, set restore directory permission to 703
+  public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION =
+      "hbase.user.scan.snapshot.restore.directory.permission";
+  public static final String SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT = 
"703";
+
+  private Admin admin;
+  private final Configuration conf;
+  private FileSystem fs;
+  private PathHelper pathHelper;
+  private ExecutorService pool;
+
+  public SnapshotScannerHDFSAclHelper(Configuration configuration, Connection 
connection)
+      throws IOException {
+    this.conf = configuration;
+    this.pathHelper = new PathHelper(conf);
+    this.fs = pathHelper.getFileSystem();
+    this.pathHelper = new PathHelper(conf);
+    this.pool = 
Executors.newFixedThreadPool(conf.getInt(USER_SCAN_SNAPSHOT_THREAD_NUMBER, 10),
+      new 
ThreadFactoryBuilder().setNameFormat("hdfs-acl-thread-%d").setDaemon(true).build());
+    if (connection == null) {
+      connection = ConnectionFactory.createConnection(conf);
+    }
+    this.admin = connection.getAdmin();
+  }
+
+  @Override
+  public void close() {
+    if (pool != null) {
+      pool.shutdown();
+    }
+  }
+
+  public void setCommonDirPermission() throws IOException {
+    // Set public directory permission to 751 to make all users have access 
permission.
+    // And we also need the access permission of the parent of HBase root 
directory, but
+    // it's not set here, because the owner of HBase root directory may don't 
own permission
+    // to change it's parent permission to 751.
+    // The {root/.tmp} and {root/.tmp/data} directories are created to make 
global user HDFS
+    // acls can be inherited.
+    Path[] paths = new Path[] { pathHelper.getRootDir(), 
pathHelper.getDataDir(),
+        pathHelper.getMobDir(), pathHelper.getMobDataDir(), 
pathHelper.getTmpDir(),
+        pathHelper.getTmpDataDir(), pathHelper.getArchiveDir(), 
pathHelper.getArchiveDataDir(),
+        pathHelper.getSnapshotRootDir() };
+    for (Path path : paths) {
+      if (!fs.exists(path)) {
+        fs.mkdirs(path);
+      }
+      fs.setPermission(path, new FsPermission(
+          conf.get(COMMON_DIRECTORY_PERMISSION, 
COMMON_DIRECTORY_PERMISSION_DEFAULT)));
+    }
+    // create snapshot restore directory
+    Path restoreDir =
+        new Path(conf.get(SNAPSHOT_RESTORE_TMP_DIR, 
SNAPSHOT_RESTORE_TMP_DIR_DEFAULT));
+    if (!fs.exists(restoreDir)) {
+      fs.mkdirs(restoreDir);
+      fs.setPermission(restoreDir, new 
FsPermission(conf.get(SNAPSHOT_RESTORE_DIRECTORY_PERMISSION,
+        SNAPSHOT_RESTORE_DIRECTORY_PERMISSION_DEFAULT)));
+    }
+  }
+
+  /**
+   * Set acl when grant user permission
+   * @param userPerm the user and permission
+   * @param skipNamespaces the namespace set to skip set acl because already 
set
+   * @param skipTables the table set to skip set acl because already set
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean grantAcl(UserPermission userPerm, Set<String> skipNamespaces,
+      Set<TableName> skipTables) {
+    try {
+      long start = System.currentTimeMillis();
+      setHDFSAcl(getHdfsAclOperations(userPerm, 
HDFSAclOperation.OperationType.MODIFY,
+        skipNamespaces, skipTables));
+      LOG.info("Set HDFS acl when grant {}, cost {} ms", userPerm.toString(),
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when grant: {}", userPerm != null ? 
userPerm.toString() : null,
+        e);
+      return false;
+    }
+  }
+
+  /**
+   * Remove acl when grant or revoke user permission
+   * @param userPerm the user and permission
+   * @param skipNamespaces the namespace set to skip remove acl
+   * @param skipTables the table set to skip remove acl
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean revokeAcl(UserPermission userPerm, Set<String> skipNamespaces,
+      Set<TableName> skipTables) {
+    try {
+      long start = System.currentTimeMillis();
+      setHDFSAcl(getHdfsAclOperations(userPerm, 
HDFSAclOperation.OperationType.REMOVE,
+        skipNamespaces, skipTables));
+      LOG.info("Set HDFS acl when revoke {}, cost {} ms", userPerm.toString(),
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when revoke: {}", userPerm != null ? 
userPerm.toString() : null,
+        e);
+      return false;
+    }
+  }
+
+  /**
+   * Set acl when take a snapshot
+   * @param snapshot the snapshot desc
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean snapshotAcl(SnapshotDescription snapshot) {
+    try {
+      long start = System.currentTimeMillis();
+      TableName tableName = snapshot.getTableName();
+      // global user permission can be inherited from default acl automatically
+      Set<String> userSet = getUsersWithTableReadAction(tableName);
+      
userSet.addAll(getUsersWithNamespaceReadAction(tableName.getNamespaceAsString()));
+      Path path = pathHelper.getSnapshotDir(snapshot.getName());
+      List<HDFSAclOperation> operations = new ArrayList<>(1);
+      operations.add(new HDFSAclOperation(fs, path, userSet, 
HDFSAclOperation.OperationType.MODIFY,
+          READ_EXECUTE, true, new ArrayList<>()));
+      setHDFSAcl(new Pair<>(new ArrayList<>(), operations));
+      LOG.info("Set HDFS acl when snapshot {}, cost {} ms", snapshot.getName(),
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when snapshot {}", snapshot != null ? 
snapshot.getName() : null,
+        e);
+      return false;
+    }
+  }
+
+  /**
+   * Reset acl when truncate table
+   * @param tableName the specific table
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean resetTableAcl(TableName tableName) {
+    try {
+      long start = System.currentTimeMillis();
+      Set<String> users = getUsersWithTableReadAction(tableName);
+      setHDFSAcl(
+        getTableHdfsAclOperations(users, 
HDFSAclOperation.OperationType.MODIFY, tableName));
+      LOG.info("Set HDFS acl when truncate {}, cost {} ms", tableName,
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when truncate {}", tableName != null ? 
tableName : null, e);
+      return false;
+    }
+  }
+
+  /**
+   * Remove table acl from ns dir when delete table
+   * @param tableName the table
+   * @param removeUsers the users whose acl will be removed
+   * @return false if an error occurred, otherwise true
+   */
+  public boolean removeNamespaceAcl(TableName tableName, Set<String> 
removeUsers) {
+    try {
+      long start = System.currentTimeMillis();
+      List<AclEntry> aclEntries =
+          removeUsers.stream().map(removeUser -> aclEntry(ACCESS, removeUser, 
READ_EXECUTE))
+              .collect(Collectors.toList());
+      Path nsPath = pathHelper.getDataNsDir(tableName.getNamespaceAsString());
+      fs.removeAclEntries(nsPath, aclEntries);
+      LOG.info("Remove HDFS acl when delete table {}, cost {} ms", tableName,
+        System.currentTimeMillis() - start);
+      return true;
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when delete table {}", tableName != null ? 
tableName : null, e);
+      return false;
+    }
+  }
+
+  /**
+   * Set table owner acl when create table
+   * @param tableName the table
+   * @param user the table owner
+   */
+  public void addTableAcl(TableName tableName, String user) {
+    try {
+      long start = System.currentTimeMillis();
+      List<AclEntry> aclEntries = new ArrayList<>(2);
+      AclEntry accessAclEntry = aclEntry(ACCESS, user, READ_EXECUTE);
+      aclEntries.add(accessAclEntry);
+      aclEntries.add(aclEntry(DEFAULT, user, READ_EXECUTE));
+      // set access and default HDFS acl for table dir
+      fs.modifyAclEntries(pathHelper.getTmpTableDir(tableName), aclEntries);
+      fs.modifyAclEntries(pathHelper.getArchiveTableDir(tableName), 
aclEntries);
+      Path tableDir = pathHelper.getDataTableDir(tableName);
+      HDFSAclOperation operation = new HDFSAclOperation(fs, tableDir, 
Sets.newHashSet(user),
+          HDFSAclOperation.OperationType.MODIFY, READ_EXECUTE, true, new 
ArrayList<>(0));
+      setSingleHDFSAcl(operation).get();
+      // set access HDFS acl for ns dir
+      List<Path> nsPath = 
getDefaultNamespacePath(tableName.getNamespaceAsString(), false);
+      for (Path path : nsPath) {
+        fs.modifyAclEntries(path, Lists.newArrayList(accessAclEntry));
+      }
+      LOG.info("Set HDFS acl when create table {}, cost {} ms", tableName,
+        System.currentTimeMillis() - start);
+    } catch (Exception e) {
+      LOG.error("Set HDFS acl error when create table {}", tableName != null ? 
tableName : null, e);
+    }
+  }
+
+  /**
+   * Generate the {@link HDFSAclOperation} list
+   * @param userPermission the user and permission
+   * @param operationType MODIFY or REMOVE HDFS acl
+   * @param skipNamespaces a namespace list whose related paths will be 
skipped when set HDFS acl
+   * @param skipTables a table list whose related paths will be skipped when 
set HDFS acl
+   * @return the {@link HDFSAclOperation} list
+   * @throws IOException if an error occurred
+   */
+  private Pair<List<HDFSAclOperation>, List<HDFSAclOperation>> 
getHdfsAclOperations(
+      UserPermission userPermission, HDFSAclOperation.OperationType 
operationType,
+      Set<String> skipNamespaces, Set<TableName> skipTables) throws 
IOException {
+    Pair<List<HDFSAclOperation>, List<HDFSAclOperation>> hdfsAclOperations = 
new Pair<>();
+    Set<String> users = Sets.newHashSet(userPermission.getUser());
+    switch (userPermission.getAccessScope()) {
+      case GLOBAL:
+        hdfsAclOperations =
+            getGlobalHdfsAclOperations(users, operationType, skipNamespaces, 
skipTables);
+        break;
+      case NAMESPACE:
+        NamespacePermission namespacePermission =
+            (NamespacePermission) userPermission.getPermission();
+        hdfsAclOperations = getNamespaceHdfsAclOperations(users, operationType,
+          namespacePermission.getNamespace(), skipTables);
+        break;
+      case TABLE:
+        TablePermission tablePermission = (TablePermission) 
userPermission.getPermission();
+        hdfsAclOperations =
+            getTableHdfsAclOperations(users, operationType, 
tablePermission.getTableName());
+        break;
+      default:
+        LOG.error("Unknown scope for permission {}", userPermission);
+    }
+    return hdfsAclOperations;
+  }
+
+  private Pair<List<HDFSAclOperation>, List<HDFSAclOperation>> 
getGlobalHdfsAclOperations(
+      Set<String> users, HDFSAclOperation.OperationType operationType, 
Set<String> skipNamespaces,
+      Set<TableName> skipTables) throws IOException {
+    // default acl path
+    List<Path> defaultGlobalPathList = getDefaultGlobalPath();
+    // skip namespace path
+    List<Path> skipPaths = Lists.newArrayList();
+    for (String namespace : skipNamespaces) {
+      skipPaths.addAll(getDefaultNamespacePath(namespace, true));
+    }
+    // skip table path
+    List<Path> tableNsPathList = new ArrayList<>();
+    for (TableName tableName : skipTables) {
+      skipPaths.addAll(getDefaultTablePath(tableName, true));
+      
tableNsPathList.addAll(getDefaultNamespacePath(tableName.getNamespaceAsString(),
 false));
+    }
+    List<HDFSAclOperation> hdfsAclOperations = 
defaultGlobalPathList.stream().map(
+      path -> new HDFSAclOperation(fs, path, users, operationType, 
READ_EXECUTE, true, skipPaths))
+        .collect(Collectors.toList());
+    if (operationType == HDFSAclOperation.OperationType.REMOVE) {
+      // add ns access acl for skip table users
+      hdfsAclOperations.addAll(tableNsPathList
+          .stream().map(path -> new HDFSAclOperation(fs, path, users,
+              HDFSAclOperation.OperationType.MODIFY, READ_EXECUTE, false, new 
ArrayList<>()))
+          .collect(Collectors.toList()));
+    }
+    Path snapshotPath = pathHelper.getSnapshotRootDir();
+    HDFSAclOperation snapshotHdfsAclOperation =
+        new HDFSAclOperation(fs, snapshotPath, users, operationType, 
READ_EXECUTE, true, skipPaths);
+    return new Pair<>(hdfsAclOperations, 
Lists.newArrayList(snapshotHdfsAclOperation));
+  }
+
+  private Pair<List<HDFSAclOperation>, List<HDFSAclOperation>> 
getNamespaceHdfsAclOperations(
+      Set<String> users, HDFSAclOperation.OperationType operationType, String 
namespace,
+      Set<TableName> skipTables) throws IOException {
+    // default acl path
+    List<Path> defaultNsPathList = getDefaultNamespacePath(namespace, false);
+    // skip path
+    List<Path> skipTablePaths = Lists.newArrayList();
+    List<Path> skipNsPaths = new ArrayList<>();
+    for (TableName tableName : skipTables) {
+      skipTablePaths.addAll(getDefaultTablePath(tableName, true));
+      
skipNsPaths.addAll(getDefaultNamespacePath(tableName.getNamespaceAsString(), 
false));
+    }
+    List<HDFSAclOperation> hdfsAclOperations =
+        defaultNsPathList.stream().map(path -> new HDFSAclOperation(fs, path, 
users, operationType,
+            READ_EXECUTE, true, skipTablePaths)).collect(Collectors.toList());
+    if (operationType == HDFSAclOperation.OperationType.REMOVE) {
+      // add ns access acl for skip table users
+      hdfsAclOperations.addAll(skipNsPaths
+          .stream().map(path -> new HDFSAclOperation(fs, path, users,
+              HDFSAclOperation.OperationType.MODIFY, READ_EXECUTE, false, new 
ArrayList<>()))
+          .collect(Collectors.toList()));
+    }
+    List<Path> namespaceSnapshotPaths = getNamespaceSnapshotPath(namespace);
+    List<HDFSAclOperation> namespaceSnapshotHdfsAclOps =
+        namespaceSnapshotPaths.stream().map(path -> new HDFSAclOperation(fs, 
path, users,
+            operationType, READ_EXECUTE, true, 
skipTablePaths)).collect(Collectors.toList());
+    return new Pair<>(hdfsAclOperations, namespaceSnapshotHdfsAclOps);
+  }
+
+  private Pair<List<HDFSAclOperation>, List<HDFSAclOperation>> 
getTableHdfsAclOperations(
+      Set<String> users, HDFSAclOperation.OperationType operationType, 
TableName tableName)
+      throws IOException {
+    String tableNamespace = tableName.getNamespaceAsString();
+    // acl path
+    List<Path> tableNsPathList = getDefaultNamespacePath(tableNamespace, 
false);
+    // default acl path
+    List<Path> tablePathList = getDefaultTablePath(tableName, false);
+    // generate hdfs acl operations
+    List<HDFSAclOperation> hdfsAclOperations =
+        tableNsPathList.stream().map(path -> new HDFSAclOperation(fs, path, 
users, operationType,
+            READ_EXECUTE, false, new 
ArrayList<>())).collect(Collectors.toList());
+    hdfsAclOperations.addAll(tablePathList.stream().map(path -> new 
HDFSAclOperation(fs, path,
+        users, operationType, READ_EXECUTE, true, new 
ArrayList<>())).collect(Collectors.toList()));
+    List<Path> tableSnapshotPath = getTableSnapshotPath(tableName);
+    List<HDFSAclOperation> tableSnapshotHdfsAclOps =
+        tableSnapshotPath.stream().map(path -> new HDFSAclOperation(fs, path, 
users, operationType,
+            READ_EXECUTE, true, new 
ArrayList<>())).collect(Collectors.toList());
+    return new Pair<>(hdfsAclOperations, tableSnapshotHdfsAclOps);
+  }
+
+  /**
+   * return paths that user will global permission will visit
+   * @return the path list
+   */
+  private List<Path> getDefaultGlobalPath() {
+    return Lists.newArrayList(pathHelper.getTmpDataDir(), 
pathHelper.getDataDir(),
+      pathHelper.getMobDataDir(), pathHelper.getArchiveDataDir());
+  }
+
+  /**
+   * return paths that user will namespace permission will visit
+   * @param namespace the namespace
+   * @param includeNamespaceSnapshot true if return the paths of namespace 
snapshots
+   * @return the path list
+   * @throws IOException if an error occurred
+   */
+  private List<Path> getDefaultNamespacePath(String namespace, boolean 
includeNamespaceSnapshot)
+      throws IOException {
+    List<Path> paths =
+        Lists.newArrayList(pathHelper.getTmpNsDir(namespace), 
pathHelper.getDataNsDir(namespace),
+          pathHelper.getMobDataNsDir(namespace), 
pathHelper.getArchiveNsDir(namespace));
+    if (includeNamespaceSnapshot) {
+      paths.addAll(getNamespaceSnapshotPath(namespace));
+    }
+    return paths;
+  }
+
+  private List<Path> getNamespaceSnapshotPath(String namespace) throws 
IOException {
+    return getNamespaceSnapshots(namespace).stream().map(snap -> 
pathHelper.getSnapshotDir(snap))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * return paths that user will table permission will visit
+   * @param tableName the table
+   * @param includeTableSnapshotPath true if return the paths of table 
snapshots
+   * @return the path list
+   * @throws IOException if an error occurred
+   */
+  private List<Path> getDefaultTablePath(TableName tableName, boolean 
includeTableSnapshotPath)
+      throws IOException {
+    List<Path> paths = Lists.newArrayList(pathHelper.getTmpTableDir(tableName),
+      pathHelper.getDataTableDir(tableName), 
pathHelper.getMobTableDir(tableName),
+      pathHelper.getArchiveTableDir(tableName));
+    if (includeTableSnapshotPath) {
+      paths.addAll(getTableSnapshotPath(tableName));
+    }
+    return paths;
+  }
+
+  private List<Path> getTableSnapshotPath(TableName tableName) throws 
IOException {
+    return getTableSnapshots(tableName).stream().map(snap -> 
pathHelper.getSnapshotDir(snap))
+        .collect(Collectors.toList());
+  }
+
+  /**
+   * Return users with namespace read permission
+   * @param namespace the namespace
+   * @return users with namespace read permission
+   * @throws IOException if an error occurred
+   */
+  private Set<String> getUsersWithNamespaceReadAction(String namespace) throws 
IOException {
+    return PermissionStorage.getNamespacePermissions(conf, 
namespace).entries().stream()
+        .filter(entry -> entry.getValue().getPermission().implies(READ))
+        .map(entry -> entry.getKey()).collect(Collectors.toSet());
+  }
+
+  /**
+   * Return users with table read permission
+   * @param tableName the table
+   * @return users with table read permission
+   * @throws IOException if an error occurred
+   */
+  private Set<String> getUsersWithTableReadAction(TableName tableName) throws 
IOException {
+    return PermissionStorage.getTablePermissions(conf, 
tableName).entries().stream()
+        .filter(entry -> entry.getValue().getPermission().implies(READ))
+        .map(entry -> entry.getKey()).collect(Collectors.toSet());
+  }
+
+  private List<String> getNamespaceSnapshots(String namespace) throws 
IOException {
+    return getSnapshots((snapDesc) -> 
TableName.valueOf(snapDesc.getTable()).getNamespaceAsString()
+        .equals(namespace));
+  }
+
+  private List<String> getTableSnapshots(TableName tableName) throws 
IOException {
+    return getSnapshots((snapDesc) -> TableName.valueOf(snapDesc.getTable()) 
== tableName);
+  }
+
+  private List<String> getSnapshots(Predicate<SnapshotDescription> predicate) 
throws IOException {
+    List<SnapshotDescription> snapshotDescriptions = admin.listSnapshots();
+    return snapshotDescriptions.stream()
+        .filter(snapshotDescription -> predicate.test(snapshotDescription))
+        .map(snapshotDescription -> 
snapshotDescription.getName()).collect(Collectors.toList());
+  }
+
+  protected PathHelper getPathHelper() {
+    return pathHelper;
+  }
+
+  /**
+   * Set HDFS acls
+   * @param hdfsAclOperations the {@link HDFSAclOperation} list
+   */
+  private void setHDFSAcl(Pair<List<HDFSAclOperation>, List<HDFSAclOperation>> 
hdfsAclOperations)
+      throws InterruptedException, ExecutionException {
+    CompletableFuture<Void> future = 
setHDFSAclParallel(hdfsAclOperations.getSecond());
+    for (HDFSAclOperation hdfsAclOperation : hdfsAclOperations.getFirst()) {
+      setSingleHDFSAcl(hdfsAclOperation).get();
+    }
+    future.get();
+  }
+
+  private CompletableFuture<Void> setSingleHDFSAcl(HDFSAclOperation acl) {
+    return CompletableFuture.supplyAsync(() -> {
+      try {
+        acl.setAcl();
+      } catch (IOException e) {
+        LOG.error("Set HDFS acl error for path {}", acl.path.toString(), e);
+      }
+      return acl;
+    }, pool).thenComposeAsync(acl2 -> {
+      List<HDFSAclOperation> childAclOperations = null;
+      try {
+        childAclOperations = acl2.getChildAclOperations();
+      } catch (IOException e) {
+        LOG.error("Set HDFS acl error for path {}", acl2.path.toString(), e);
+      }
+      return setHDFSAclParallel(childAclOperations);
+    }, pool);
+  }
+
+  private CompletableFuture<Void> setHDFSAclParallel(List<HDFSAclOperation> 
operations) {
+    List<CompletableFuture<Void>> futures = operations.stream()
+        .map(operation -> 
setSingleHDFSAcl(operation)).collect(Collectors.toList());
+    CompletableFuture<Void> future =
+        CompletableFuture.allOf(futures.toArray(new 
CompletableFuture[futures.size()]));
+    return future;
+  }
+
+  private static AclEntry aclEntry(AclEntryScope scope, String name, FsAction 
action) {
+    return new AclEntry.Builder().setScope(scope)
+        .setType(AuthUtil.isGroupPrincipal(name) ? GROUP : 
USER).setName(name).setPermission(action)
+        .build();
+  }
+
+  /**
+   * Inner class used to describe modify or remove what acl entries for files 
or directories(and
+   * child files)
+   */
+  private static class HDFSAclOperation {
+    enum OperationType {
+      MODIFY, REMOVE
+    }
+
+    interface Operation {
+      void apply(FileSystem fs, Path path, List<AclEntry> aclList) throws 
IOException;
+    }
+
+    private FileSystem fs;
+    private Path path;
+    private List<AclEntry> dirAcl;
+    private List<AclEntry> fileAcl;
+    private boolean recursive;
+    private Operation operation;
+    private List<Path> skipPaths;
+
+    HDFSAclOperation(FileSystem fs, Path path, Set<String> users, 
OperationType operationType,
+        FsAction fsAction, boolean recursive, List<Path> skipPaths) {
+      this.fs = fs;
+      this.path = path;
+      this.dirAcl = getDefaultAclEntries(users, fsAction);
+      this.fileAcl = getAccessAclEntries(users, fsAction);
+      this.recursive = recursive;
+      this.skipPaths = skipPaths;
+      if (operationType == OperationType.MODIFY) {
+        operation =
+            (fileSystem, modifyPath, aclList) -> 
fileSystem.modifyAclEntries(modifyPath, aclList);
+      } else if (operationType == OperationType.REMOVE) {
+        operation =
+            (fileSystem, modifyPath, aclList) -> 
fileSystem.removeAclEntries(modifyPath, aclList);
+      }
+    }
+
+    HDFSAclOperation(Path path, HDFSAclOperation parent) {
+      this.fs = parent.fs;
+      this.path = path;
+      this.dirAcl = parent.dirAcl;
+      this.fileAcl = parent.fileAcl;
+      this.operation = parent.operation;
+      this.recursive = parent.recursive;
+      this.skipPaths = parent.skipPaths;
+    }
+
+    List<HDFSAclOperation> getChildAclOperations() throws IOException {
+      List<HDFSAclOperation> hdfsAclOperations = new ArrayList<>();
+      if (!skipPaths.contains(path) && recursive && fs.isDirectory(path)) {
+        FileStatus[] fileStatuses = fs.listStatus(path);
+        for (FileStatus fileStatus : fileStatuses) {
+          hdfsAclOperations.add(new HDFSAclOperation(fileStatus.getPath(), 
this));
+        }
+      }
+      return hdfsAclOperations;
+    }
+
+    void setAcl() throws IOException {
+      if (!skipPaths.contains(path) && fs.exists(path)) {
+        if (fs.isDirectory(path)) {
+          if (recursive) {
+            operation.apply(fs, path, dirAcl);
+          } else {
+            operation.apply(fs, path, fileAcl);
+          }
+        } else {
+          operation.apply(fs, path, fileAcl);
+        }
+      }
+    }
+
+    private List<AclEntry> getAccessAclEntries(Set<String> users, FsAction 
action) {
+      List<AclEntry> aclList = new ArrayList<>();
+      for (String user : users) {
+        aclList.add(aclEntry(ACCESS, user, action));
+      }
+      return aclList;
+    }
+
+    private List<AclEntry> getDefaultAclEntries(Set<String> users, FsAction 
action) {
+      List<AclEntry> dirAclList = new ArrayList<>();
+      for (String user : users) {
+        dirAclList.add(aclEntry(ACCESS, user, action));
+        dirAclList.add(aclEntry(DEFAULT, user, action));
+      }
+      return dirAclList;
+    }
+  }
+
+  protected static final class PathHelper {
 
 Review comment:
   Also used in SnapshotScannerHDFSAclController

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to