[ 
https://issues.apache.org/jira/browse/HADOOP-19254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17910650#comment-17910650
 ] 

ASF GitHub Bot commented on HADOOP-19254:
-----------------------------------------

steveloughran commented on code in PR #7197:
URL: https://github.com/apache/hadoop/pull/7197#discussion_r1905562029


##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java:
##########
@@ -1,84 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BulkDelete;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.stream.Collectors;
-
 public class BulkDeleteCommand extends FsCommand {
-    public static void registerCommands(CommandFactory factory) {
-        factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
-    }
 
-    public static final String name = "bulkDelete";
+  public static void registerCommands(CommandFactory factory) {
+    factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
+  }
 
-    public static final String READ_FROM_FILE = "readFromFile";
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkDeleteCommand.class.getName());
 
-    public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] 
[<basePath> <paths>]";
+  public static final String NAME = "bulkDelete";
 
-    public static final String DESCRIPTION = "Deletes the set of files under 
the given path. If a list of paths " +
-            "is provided then the paths are deleted directly. User can also 
point to the file where the paths are" +
-            "listed as full object names.";
+  /**
+   * File Name parameter to be specified at command line.
+   */
+  public static final String READ_FROM_FILE = "readFromFile";
 
-    private String fileName;
+  /**
+   * Page size parameter specified at command line.
+   */
+  public static final String PAGE_SIZE = "pageSize";
 
-    /*
-    Making the class stateful as the PathData initialization for all args is 
not needed
-     */
-    LinkedList<String> childArgs;
 
-    protected BulkDeleteCommand() {
-        this.childArgs = new LinkedList<>();
+  public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] [" + 
PAGE_SIZE
+          + "] [<pageSize>] [<basePath> <paths>]";
+
+  public static final String DESCRIPTION = "Deletes the set of files under the 
given <path>.\n" +
+          "If a list of paths is provided at command line then the paths are 
deleted directly.\n" +
+          "User can also point to the file where the paths are listed as full 
object names using the \"fileName\"" +
+          "parameter. The presence of a file name takes precedence over the 
list of objects.\n" +
+          "Page size refers to the size of each bulk delete batch." +
+          "Users can specify the page size using \"pageSize\" command 
parameter." +
+          "Default value is 1.\n";
+
+  private String fileName;
+
+  private int pageSize;
+
+  /**
+   * Making the class stateful as the PathData initialization for all args is 
not needed
+   */
+  LinkedList<String> childArgs;
+
+  protected BulkDeleteCommand() {
+    this.childArgs = new LinkedList<>();
+  }
+
+  protected BulkDeleteCommand(Configuration conf) {
+    super(conf);
+    this.childArgs = new LinkedList<>();
+    this.pageSize = 1;
+  }
+
+  /**
+   * Processes the command line options and initialize the variables.
+   *
+   * @param args the command line arguments
+   * @throws IOException in case of wrong arguments passed
+   */
+  @Override
+  protected void processOptions(LinkedList<String> args) throws IOException {
+    CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE);
+    cf.addOptionWithValue(READ_FROM_FILE);
+    cf.addOptionWithValue(PAGE_SIZE);
+    cf.parse(args);
+    fileName = cf.getOptValue(READ_FROM_FILE);
+    if (cf.getOptValue(PAGE_SIZE) != null) {
+      pageSize = Integer.parseInt(cf.getOptValue(PAGE_SIZE));
+    } else {
+      pageSize = 1;
     }
+  }
 
-    protected BulkDeleteCommand(Configuration conf) {super(conf);}
+  /**
+   * Processes the command line arguments and stores the child arguments in a 
list.
+   *
+   * @param args strings to expand into {@link PathData} objects
+   * @return the base path of the bulk delete command.
+   * @throws IOException if the wrong number of arguments specified
+   */
+  @Override
+  protected LinkedList<PathData> expandArguments(LinkedList<String> args) 
throws IOException {
+    if (fileName == null && args.size() < 2) {
+      throw new IOException("Invalid Number of Arguments. Expected more");
+    }
+    LinkedList<PathData> pathData = new LinkedList<>();
+    pathData.add(new PathData(args.get(0), getConf()));
+    args.remove(0);
+    this.childArgs = args;
+    return pathData;
+  }
 
-    @Override
-    protected void processOptions(LinkedList<String> args) throws IOException {
-        CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE);
-        cf.addOptionWithValue(READ_FROM_FILE);
-        cf.parse(args);
-        fileName = cf.getOptValue(READ_FROM_FILE);
+  /**
+   * Deletes the objects using the bulk delete api.
+   *
+   * @param bulkDelete Bulkdelete object exposing the API
+   * @param paths      list of paths to be deleted in the base path
+   * @throws IOException on error in execution of the delete command
+   */
+  void deleteInBatches(BulkDelete bulkDelete, List<Path> paths) throws 
IOException {
+    Batch<Path> batches = new Batch<>(paths, pageSize);
+    while (batches.hasNext()) {
+      try {
+        List<Map.Entry<Path, String>> result = 
bulkDelete.bulkDelete(batches.next());
+        LOG.debug("Deleted Result:{}", result.toString());

Review Comment:
   include the count of #of failed deletions (i.e result map size), maybe at 
log warn; print actual values at debug



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java:
##########
@@ -1,84 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BulkDelete;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.stream.Collectors;
-
 public class BulkDeleteCommand extends FsCommand {
-    public static void registerCommands(CommandFactory factory) {
-        factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
-    }
 
-    public static final String name = "bulkDelete";
+  public static void registerCommands(CommandFactory factory) {
+    factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
+  }
 
-    public static final String READ_FROM_FILE = "readFromFile";
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkDeleteCommand.class.getName());
 
-    public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] 
[<basePath> <paths>]";
+  public static final String NAME = "bulkDelete";
 
-    public static final String DESCRIPTION = "Deletes the set of files under 
the given path. If a list of paths " +
-            "is provided then the paths are deleted directly. User can also 
point to the file where the paths are" +
-            "listed as full object names.";
+  /**
+   * File Name parameter to be specified at command line.
+   */
+  public static final String READ_FROM_FILE = "readFromFile";
 
-    private String fileName;
+  /**
+   * Page size parameter specified at command line.
+   */
+  public static final String PAGE_SIZE = "pageSize";
 
-    /*
-    Making the class stateful as the PathData initialization for all args is 
not needed
-     */
-    LinkedList<String> childArgs;
 
-    protected BulkDeleteCommand() {
-        this.childArgs = new LinkedList<>();
+  public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] [" + 
PAGE_SIZE
+          + "] [<pageSize>] [<basePath> <paths>]";
+
+  public static final String DESCRIPTION = "Deletes the set of files under the 
given <path>.\n" +
+          "If a list of paths is provided at command line then the paths are 
deleted directly.\n" +
+          "User can also point to the file where the paths are listed as full 
object names using the \"fileName\"" +
+          "parameter. The presence of a file name takes precedence over the 
list of objects.\n" +
+          "Page size refers to the size of each bulk delete batch." +
+          "Users can specify the page size using \"pageSize\" command 
parameter." +
+          "Default value is 1.\n";
+
+  private String fileName;
+
+  private int pageSize;
+
+  /**
+   * Making the class stateful as the PathData initialization for all args is 
not needed
+   */
+  LinkedList<String> childArgs;
+
+  protected BulkDeleteCommand() {
+    this.childArgs = new LinkedList<>();
+  }
+
+  protected BulkDeleteCommand(Configuration conf) {
+    super(conf);
+    this.childArgs = new LinkedList<>();
+    this.pageSize = 1;
+  }
+
+  /**
+   * Processes the command line options and initialize the variables.
+   *
+   * @param args the command line arguments
+   * @throws IOException in case of wrong arguments passed
+   */
+  @Override
+  protected void processOptions(LinkedList<String> args) throws IOException {
+    CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE);
+    cf.addOptionWithValue(READ_FROM_FILE);
+    cf.addOptionWithValue(PAGE_SIZE);
+    cf.parse(args);
+    fileName = cf.getOptValue(READ_FROM_FILE);
+    if (cf.getOptValue(PAGE_SIZE) != null) {
+      pageSize = Integer.parseInt(cf.getOptValue(PAGE_SIZE));
+    } else {
+      pageSize = 1;
     }
+  }
 
-    protected BulkDeleteCommand(Configuration conf) {super(conf);}
+  /**
+   * Processes the command line arguments and stores the child arguments in a 
list.
+   *
+   * @param args strings to expand into {@link PathData} objects
+   * @return the base path of the bulk delete command.
+   * @throws IOException if the wrong number of arguments specified
+   */
+  @Override
+  protected LinkedList<PathData> expandArguments(LinkedList<String> args) 
throws IOException {
+    if (fileName == null && args.size() < 2) {
+      throw new IOException("Invalid Number of Arguments. Expected more");
+    }
+    LinkedList<PathData> pathData = new LinkedList<>();
+    pathData.add(new PathData(args.get(0), getConf()));
+    args.remove(0);
+    this.childArgs = args;
+    return pathData;
+  }
 
-    @Override
-    protected void processOptions(LinkedList<String> args) throws IOException {
-        CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE);
-        cf.addOptionWithValue(READ_FROM_FILE);
-        cf.parse(args);
-        fileName = cf.getOptValue(READ_FROM_FILE);
+  /**
+   * Deletes the objects using the bulk delete api.
+   *
+   * @param bulkDelete Bulkdelete object exposing the API
+   * @param paths      list of paths to be deleted in the base path
+   * @throws IOException on error in execution of the delete command
+   */
+  void deleteInBatches(BulkDelete bulkDelete, List<Path> paths) throws 
IOException {
+    Batch<Path> batches = new Batch<>(paths, pageSize);
+    while (batches.hasNext()) {
+      try {
+        List<Map.Entry<Path, String>> result = 
bulkDelete.bulkDelete(batches.next());
+        LOG.debug("Deleted Result:{}", result.toString());
+      } catch (IllegalArgumentException e) {
+        LOG.error("Caught exception while deleting", e);
+        throw new IOException(e);
+      }
     }
+  }
 
-    @Override
-    protected LinkedList<PathData> expandArguments(LinkedList<String> args) 
throws IOException {
-        if(fileName == null && args.size() < 2) {
-            throw new IOException("Invalid Number of Arguments. Expected 
more");
+  @Override
+  protected void processArguments(LinkedList<PathData> args) throws 
IOException {
+    PathData basePath = args.get(0);
+    LOG.info("Deleting files under:{}", basePath);
+    List<Path> pathList = new ArrayList<>();
+    if (fileName != null) {
+      LOG.info("Reading from file:{}", fileName);
+      FileSystem localFile = FileSystem.get(getConf());
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+              localFile.open(new Path(fileName)), StandardCharsets.UTF_8));
+      String line;
+      while ((line = br.readLine()) != null) {
+        if (!line.startsWith("#")) {
+          pathList.add(new Path(line));
         }
-        LinkedList<PathData> pathData = new LinkedList<>();
-        pathData.add(new PathData(args.get(0), getConf()));
-        args.remove(0);
-        this.childArgs = args;
-        return pathData;
+      }
+      br.close();
+    } else {
+      
pathList.addAll(this.childArgs.stream().map(Path::new).collect(Collectors.toList()));
     }
+    LOG.debug("Deleting:{}", pathList);
+    BulkDelete bulkDelete = basePath.fs.createBulkDelete(basePath.path);
+    deleteInBatches(bulkDelete, pathList);
+  }
 
-    @Override
-    protected void processArguments(LinkedList<PathData> args) throws 
IOException {
-        PathData basePath = args.get(0);
-        out.println("Deleting files under:" + basePath);
-        List<Path> pathList = new ArrayList<>();
-        if(fileName != null) {
-            FileSystem localFile = FileSystem.get(getConf());
-            BufferedReader br = new BufferedReader(new 
InputStreamReader(localFile.open(new Path(fileName))));
-            String line;
-            while((line = br.readLine()) != null) {
-                pathList.add(new Path(line));
-            }
-        } else {
-            
pathList.addAll(this.childArgs.stream().map(Path::new).collect(Collectors.toList()));
-        }
-        BulkDelete bulkDelete = basePath.fs.createBulkDelete(basePath.path);
-        bulkDelete.bulkDelete(pathList);
+  /**
+   * Batch class for deleting files in batches, once initialized the inner 
list can't be modified.
+   *
+   * @param <T> template type for batches
+   */
+  static class Batch<T> {

Review Comment:
   private



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestBulkDeleteCommand.java:
##########
@@ -1,45 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.*;

Review Comment:
   expand



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestBulkDeleteCommand.java:
##########
@@ -1,45 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.HadoopTestBase;
+import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.LinkedList;
+public class TestBulkDeleteCommand extends HadoopTestBase {
+  private static Configuration conf;
+  private static FsShell shell;
+  private static LocalFileSystem lfs;
+  private static Path testRootDir;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+  @BeforeClass
+  public static void setup() throws IOException {
+    conf = new Configuration();
+    shell = new FsShell(conf);
+    lfs = FileSystem.getLocal(conf);
+    testRootDir = lfs.makeQualified(new Path(GenericTestUtils.getTempPath(
+            "testFsShellBulkDelete")));
+    lfs.delete(testRootDir, true);
+    lfs.mkdirs(testRootDir);
+    lfs.setWorkingDirectory(testRootDir);
+  }
 
-public class TestBulkDeleteCommand {
-    private static Configuration conf;
+  @Test
+  public void testDefaults() throws IOException {
+    LinkedList<String> options = new LinkedList<>();
+    BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf);
+    bulkDeleteCommand.processOptions(options);
+    assertTrue(bulkDeleteCommand.childArgs.isEmpty());
+  }
 
-    @BeforeClass
-    public static void setup() throws IOException {
-        conf = new Configuration();
-    }
+  @Test
+  public void testArguments() throws IOException, URISyntaxException {
+    BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf);
+    LinkedList<String> arguments = new LinkedList<>();
+    String arg1 = "file:///file/name/1";
+    String arg2 = "file:///file/name/1/2";
+    arguments.add(arg1);
+    arguments.add(arg2);
+    LinkedList<PathData> pathData = 
bulkDeleteCommand.expandArguments(arguments);
+    Assertions.assertThat(pathData.size()).
+            describedAs("Only one root path must be present").isEqualTo(1);
+    Assertions.assertThat(pathData.get(0).path.toUri().getPath()).
+            describedAs("Base path of the command should match").isEqualTo(new 
URI(arg1).getPath());
+    Assertions.assertThat(bulkDeleteCommand.childArgs.size()).

Review Comment:
   use assertJ list assertions .hasSize()



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestBulkDeleteCommand.java:
##########
@@ -1,45 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;

Review Comment:
   expand



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestBulkDeleteCommand.java:
##########
@@ -1,45 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.HadoopTestBase;
+import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.LinkedList;
+public class TestBulkDeleteCommand extends HadoopTestBase {

Review Comment:
   prefer extending `AbstractHadoopTestBase`



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java:
##########
@@ -1,84 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BulkDelete;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.stream.Collectors;
-
 public class BulkDeleteCommand extends FsCommand {
-    public static void registerCommands(CommandFactory factory) {
-        factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
-    }
 
-    public static final String name = "bulkDelete";
+  public static void registerCommands(CommandFactory factory) {
+    factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
+  }
 
-    public static final String READ_FROM_FILE = "readFromFile";
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkDeleteCommand.class.getName());
 
-    public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] 
[<basePath> <paths>]";
+  public static final String NAME = "bulkDelete";
 
-    public static final String DESCRIPTION = "Deletes the set of files under 
the given path. If a list of paths " +
-            "is provided then the paths are deleted directly. User can also 
point to the file where the paths are" +
-            "listed as full object names.";
+  /**
+   * File Name parameter to be specified at command line.
+   */
+  public static final String READ_FROM_FILE = "readFromFile";
 
-    private String fileName;
+  /**
+   * Page size parameter specified at command line.
+   */
+  public static final String PAGE_SIZE = "pageSize";
 
-    /*
-    Making the class stateful as the PathData initialization for all args is 
not needed
-     */
-    LinkedList<String> childArgs;
 
-    protected BulkDeleteCommand() {
-        this.childArgs = new LinkedList<>();
+  public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] [" + 
PAGE_SIZE
+          + "] [<pageSize>] [<basePath> <paths>]";
+
+  public static final String DESCRIPTION = "Deletes the set of files under the 
given <path>.\n" +
+          "If a list of paths is provided at command line then the paths are 
deleted directly.\n" +
+          "User can also point to the file where the paths are listed as full 
object names using the \"fileName\"" +
+          "parameter. The presence of a file name takes precedence over the 
list of objects.\n" +
+          "Page size refers to the size of each bulk delete batch." +
+          "Users can specify the page size using \"pageSize\" command 
parameter." +
+          "Default value is 1.\n";
+
+  private String fileName;
+
+  private int pageSize;
+
+  /**
+   * Making the class stateful as the PathData initialization for all args is 
not needed
+   */
+  LinkedList<String> childArgs;
+
+  protected BulkDeleteCommand() {
+    this.childArgs = new LinkedList<>();
+  }
+
+  protected BulkDeleteCommand(Configuration conf) {
+    super(conf);
+    this.childArgs = new LinkedList<>();
+    this.pageSize = 1;
+  }
+
+  /**
+   * Processes the command line options and initialize the variables.
+   *
+   * @param args the command line arguments
+   * @throws IOException in case of wrong arguments passed
+   */
+  @Override
+  protected void processOptions(LinkedList<String> args) throws IOException {
+    CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE);
+    cf.addOptionWithValue(READ_FROM_FILE);
+    cf.addOptionWithValue(PAGE_SIZE);
+    cf.parse(args);
+    fileName = cf.getOptValue(READ_FROM_FILE);
+    if (cf.getOptValue(PAGE_SIZE) != null) {
+      pageSize = Integer.parseInt(cf.getOptValue(PAGE_SIZE));
+    } else {
+      pageSize = 1;
     }
+  }
 
-    protected BulkDeleteCommand(Configuration conf) {super(conf);}
+  /**
+   * Processes the command line arguments and stores the child arguments in a 
list.
+   *
+   * @param args strings to expand into {@link PathData} objects
+   * @return the base path of the bulk delete command.
+   * @throws IOException if the wrong number of arguments specified
+   */
+  @Override
+  protected LinkedList<PathData> expandArguments(LinkedList<String> args) 
throws IOException {
+    if (fileName == null && args.size() < 2) {
+      throw new IOException("Invalid Number of Arguments. Expected more");
+    }
+    LinkedList<PathData> pathData = new LinkedList<>();
+    pathData.add(new PathData(args.get(0), getConf()));
+    args.remove(0);
+    this.childArgs = args;
+    return pathData;
+  }
 
-    @Override
-    protected void processOptions(LinkedList<String> args) throws IOException {
-        CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE);
-        cf.addOptionWithValue(READ_FROM_FILE);
-        cf.parse(args);
-        fileName = cf.getOptValue(READ_FROM_FILE);
+  /**
+   * Deletes the objects using the bulk delete api.
+   *
+   * @param bulkDelete Bulkdelete object exposing the API
+   * @param paths      list of paths to be deleted in the base path
+   * @throws IOException on error in execution of the delete command
+   */
+  void deleteInBatches(BulkDelete bulkDelete, List<Path> paths) throws 
IOException {
+    Batch<Path> batches = new Batch<>(paths, pageSize);
+    while (batches.hasNext()) {
+      try {
+        List<Map.Entry<Path, String>> result = 
bulkDelete.bulkDelete(batches.next());
+        LOG.debug("Deleted Result:{}", result.toString());
+      } catch (IllegalArgumentException e) {
+        LOG.error("Caught exception while deleting", e);
+        throw new IOException(e);
+      }
     }
+  }
 
-    @Override
-    protected LinkedList<PathData> expandArguments(LinkedList<String> args) 
throws IOException {
-        if(fileName == null && args.size() < 2) {
-            throw new IOException("Invalid Number of Arguments. Expected 
more");
+  @Override
+  protected void processArguments(LinkedList<PathData> args) throws 
IOException {
+    PathData basePath = args.get(0);
+    LOG.info("Deleting files under:{}", basePath);
+    List<Path> pathList = new ArrayList<>();
+    if (fileName != null) {
+      LOG.info("Reading from file:{}", fileName);
+      FileSystem localFile = FileSystem.get(getConf());
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+              localFile.open(new Path(fileName)), StandardCharsets.UTF_8));
+      String line;
+      while ((line = br.readLine()) != null) {
+        if (!line.startsWith("#")) {
+          pathList.add(new Path(line));
         }
-        LinkedList<PathData> pathData = new LinkedList<>();
-        pathData.add(new PathData(args.get(0), getConf()));
-        args.remove(0);
-        this.childArgs = args;
-        return pathData;
+      }
+      br.close();
+    } else {
+      
pathList.addAll(this.childArgs.stream().map(Path::new).collect(Collectors.toList()));

Review Comment:
   nit
   * remove the `this.`
   * put each stage on a separate line, for visibility and debugging



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestBulkDeleteCommand.java:
##########
@@ -1,45 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.HadoopTestBase;
+import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.LinkedList;
+public class TestBulkDeleteCommand extends HadoopTestBase {
+  private static Configuration conf;
+  private static FsShell shell;
+  private static LocalFileSystem lfs;
+  private static Path testRootDir;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+  @BeforeClass
+  public static void setup() throws IOException {
+    conf = new Configuration();
+    shell = new FsShell(conf);
+    lfs = FileSystem.getLocal(conf);
+    testRootDir = lfs.makeQualified(new Path(GenericTestUtils.getTempPath(
+            "testFsShellBulkDelete")));
+    lfs.delete(testRootDir, true);
+    lfs.mkdirs(testRootDir);
+    lfs.setWorkingDirectory(testRootDir);
+  }
 
-public class TestBulkDeleteCommand {
-    private static Configuration conf;
+  @Test
+  public void testDefaults() throws IOException {
+    LinkedList<String> options = new LinkedList<>();
+    BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf);
+    bulkDeleteCommand.processOptions(options);
+    assertTrue(bulkDeleteCommand.childArgs.isEmpty());

Review Comment:
   use appropriate assertJ list assert -it'll dump the list on a failure



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java:
##########
@@ -1,84 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BulkDelete;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.stream.Collectors;
-
 public class BulkDeleteCommand extends FsCommand {
-    public static void registerCommands(CommandFactory factory) {
-        factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
-    }
 
-    public static final String name = "bulkDelete";
+  public static void registerCommands(CommandFactory factory) {
+    factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
+  }
 
-    public static final String READ_FROM_FILE = "readFromFile";
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkDeleteCommand.class.getName());
 
-    public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] 
[<basePath> <paths>]";
+  public static final String NAME = "bulkDelete";
 
-    public static final String DESCRIPTION = "Deletes the set of files under 
the given path. If a list of paths " +
-            "is provided then the paths are deleted directly. User can also 
point to the file where the paths are" +
-            "listed as full object names.";
+  /**
+   * File Name parameter to be specified at command line.
+   */
+  public static final String READ_FROM_FILE = "readFromFile";
 
-    private String fileName;
+  /**
+   * Page size parameter specified at command line.
+   */
+  public static final String PAGE_SIZE = "pageSize";
 
-    /*
-    Making the class stateful as the PathData initialization for all args is 
not needed
-     */
-    LinkedList<String> childArgs;
 
-    protected BulkDeleteCommand() {
-        this.childArgs = new LinkedList<>();
+  public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] [" + 
PAGE_SIZE
+          + "] [<pageSize>] [<basePath> <paths>]";
+
+  public static final String DESCRIPTION = "Deletes the set of files under the 
given <path>.\n" +
+          "If a list of paths is provided at command line then the paths are 
deleted directly.\n" +
+          "User can also point to the file where the paths are listed as full 
object names using the \"fileName\"" +
+          "parameter. The presence of a file name takes precedence over the 
list of objects.\n" +
+          "Page size refers to the size of each bulk delete batch." +
+          "Users can specify the page size using \"pageSize\" command 
parameter." +
+          "Default value is 1.\n";
+
+  private String fileName;
+
+  private int pageSize;
+
+  /**
+   * Making the class stateful as the PathData initialization for all args is 
not needed

Review Comment:
   needs a . at the end or some javadoc versions fail



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java:
##########
@@ -1,84 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BulkDelete;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.stream.Collectors;
-
 public class BulkDeleteCommand extends FsCommand {
-    public static void registerCommands(CommandFactory factory) {
-        factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
-    }
 
-    public static final String name = "bulkDelete";
+  public static void registerCommands(CommandFactory factory) {
+    factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
+  }
 
-    public static final String READ_FROM_FILE = "readFromFile";
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkDeleteCommand.class.getName());
 
-    public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] 
[<basePath> <paths>]";
+  public static final String NAME = "bulkDelete";
 
-    public static final String DESCRIPTION = "Deletes the set of files under 
the given path. If a list of paths " +
-            "is provided then the paths are deleted directly. User can also 
point to the file where the paths are" +
-            "listed as full object names.";
+  /**
+   * File Name parameter to be specified at command line.
+   */
+  public static final String READ_FROM_FILE = "readFromFile";
 
-    private String fileName;
+  /**
+   * Page size parameter specified at command line.
+   */
+  public static final String PAGE_SIZE = "pageSize";
 
-    /*
-    Making the class stateful as the PathData initialization for all args is 
not needed
-     */
-    LinkedList<String> childArgs;
 
-    protected BulkDeleteCommand() {
-        this.childArgs = new LinkedList<>();
+  public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] [" + 
PAGE_SIZE
+          + "] [<pageSize>] [<basePath> <paths>]";
+
+  public static final String DESCRIPTION = "Deletes the set of files under the 
given <path>.\n" +
+          "If a list of paths is provided at command line then the paths are 
deleted directly.\n" +
+          "User can also point to the file where the paths are listed as full 
object names using the \"fileName\"" +
+          "parameter. The presence of a file name takes precedence over the 
list of objects.\n" +
+          "Page size refers to the size of each bulk delete batch." +
+          "Users can specify the page size using \"pageSize\" command 
parameter." +
+          "Default value is 1.\n";
+
+  private String fileName;
+
+  private int pageSize;
+
+  /**
+   * Making the class stateful as the PathData initialization for all args is 
not needed
+   */
+  LinkedList<String> childArgs;
+
+  protected BulkDeleteCommand() {
+    this.childArgs = new LinkedList<>();
+  }
+
+  protected BulkDeleteCommand(Configuration conf) {
+    super(conf);
+    this.childArgs = new LinkedList<>();
+    this.pageSize = 1;
+  }
+
+  /**
+   * Processes the command line options and initialize the variables.
+   *
+   * @param args the command line arguments
+   * @throws IOException in case of wrong arguments passed
+   */
+  @Override
+  protected void processOptions(LinkedList<String> args) throws IOException {
+    CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE);
+    cf.addOptionWithValue(READ_FROM_FILE);
+    cf.addOptionWithValue(PAGE_SIZE);
+    cf.parse(args);
+    fileName = cf.getOptValue(READ_FROM_FILE);
+    if (cf.getOptValue(PAGE_SIZE) != null) {
+      pageSize = Integer.parseInt(cf.getOptValue(PAGE_SIZE));
+    } else {
+      pageSize = 1;
     }
+  }
 
-    protected BulkDeleteCommand(Configuration conf) {super(conf);}
+  /**
+   * Processes the command line arguments and stores the child arguments in a 
list.
+   *
+   * @param args strings to expand into {@link PathData} objects
+   * @return the base path of the bulk delete command.
+   * @throws IOException if the wrong number of arguments specified
+   */
+  @Override
+  protected LinkedList<PathData> expandArguments(LinkedList<String> args) 
throws IOException {
+    if (fileName == null && args.size() < 2) {
+      throw new IOException("Invalid Number of Arguments. Expected more");

Review Comment:
   better to show a usage string (just define it as a constant). 
   
   the command tools aren't great at reporting exceptions, so print it before 
throwing



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java:
##########
@@ -1,84 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.*;

Review Comment:
   sorry, confused you. we shouldn't have actual .imports, leave them as static 
stuff down below. 



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java:
##########
@@ -1,84 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.stream.Collectors;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BulkDelete;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.stream.Collectors;
-
 public class BulkDeleteCommand extends FsCommand {
-    public static void registerCommands(CommandFactory factory) {
-        factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
-    }
 
-    public static final String name = "bulkDelete";
+  public static void registerCommands(CommandFactory factory) {
+    factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
+  }
 
-    public static final String READ_FROM_FILE = "readFromFile";
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkDeleteCommand.class.getName());
 
-    public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] 
[<basePath> <paths>]";
+  public static final String NAME = "bulkDelete";
 
-    public static final String DESCRIPTION = "Deletes the set of files under 
the given path. If a list of paths " +
-            "is provided then the paths are deleted directly. User can also 
point to the file where the paths are" +
-            "listed as full object names.";
+  /**
+   * File Name parameter to be specified at command line.
+   */
+  public static final String READ_FROM_FILE = "readFromFile";
 
-    private String fileName;
+  /**
+   * Page size parameter specified at command line.
+   */
+  public static final String PAGE_SIZE = "pageSize";
 
-    /*
-    Making the class stateful as the PathData initialization for all args is 
not needed
-     */
-    LinkedList<String> childArgs;
 
-    protected BulkDeleteCommand() {
-        this.childArgs = new LinkedList<>();
+  public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] [" + 
PAGE_SIZE
+          + "] [<pageSize>] [<basePath> <paths>]";
+
+  public static final String DESCRIPTION = "Deletes the set of files under the 
given <path>.\n" +
+          "If a list of paths is provided at command line then the paths are 
deleted directly.\n" +
+          "User can also point to the file where the paths are listed as full 
object names using the \"fileName\"" +
+          "parameter. The presence of a file name takes precedence over the 
list of objects.\n" +
+          "Page size refers to the size of each bulk delete batch." +
+          "Users can specify the page size using \"pageSize\" command 
parameter." +
+          "Default value is 1.\n";
+
+  private String fileName;
+
+  private int pageSize;
+
+  /**
+   * Making the class stateful as the PathData initialization for all args is 
not needed
+   */
+  LinkedList<String> childArgs;
+
+  protected BulkDeleteCommand() {
+    this.childArgs = new LinkedList<>();
+  }
+
+  protected BulkDeleteCommand(Configuration conf) {
+    super(conf);
+    this.childArgs = new LinkedList<>();
+    this.pageSize = 1;
+  }
+
+  /**
+   * Processes the command line options and initialize the variables.
+   *
+   * @param args the command line arguments
+   * @throws IOException in case of wrong arguments passed
+   */
+  @Override
+  protected void processOptions(LinkedList<String> args) throws IOException {
+    CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE);
+    cf.addOptionWithValue(READ_FROM_FILE);
+    cf.addOptionWithValue(PAGE_SIZE);
+    cf.parse(args);
+    fileName = cf.getOptValue(READ_FROM_FILE);
+    if (cf.getOptValue(PAGE_SIZE) != null) {
+      pageSize = Integer.parseInt(cf.getOptValue(PAGE_SIZE));
+    } else {
+      pageSize = 1;
     }
+  }
 
-    protected BulkDeleteCommand(Configuration conf) {super(conf);}
+  /**
+   * Processes the command line arguments and stores the child arguments in a 
list.
+   *
+   * @param args strings to expand into {@link PathData} objects
+   * @return the base path of the bulk delete command.
+   * @throws IOException if the wrong number of arguments specified
+   */
+  @Override
+  protected LinkedList<PathData> expandArguments(LinkedList<String> args) 
throws IOException {
+    if (fileName == null && args.size() < 2) {
+      throw new IOException("Invalid Number of Arguments. Expected more");
+    }
+    LinkedList<PathData> pathData = new LinkedList<>();
+    pathData.add(new PathData(args.get(0), getConf()));
+    args.remove(0);
+    this.childArgs = args;
+    return pathData;
+  }
 
-    @Override
-    protected void processOptions(LinkedList<String> args) throws IOException {
-        CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE);
-        cf.addOptionWithValue(READ_FROM_FILE);
-        cf.parse(args);
-        fileName = cf.getOptValue(READ_FROM_FILE);
+  /**
+   * Deletes the objects using the bulk delete api.
+   *
+   * @param bulkDelete Bulkdelete object exposing the API
+   * @param paths      list of paths to be deleted in the base path
+   * @throws IOException on error in execution of the delete command
+   */
+  void deleteInBatches(BulkDelete bulkDelete, List<Path> paths) throws 
IOException {
+    Batch<Path> batches = new Batch<>(paths, pageSize);
+    while (batches.hasNext()) {
+      try {
+        List<Map.Entry<Path, String>> result = 
bulkDelete.bulkDelete(batches.next());
+        LOG.debug("Deleted Result:{}", result.toString());
+      } catch (IllegalArgumentException e) {
+        LOG.error("Caught exception while deleting", e);
+        throw new IOException(e);
+      }
     }
+  }
 
-    @Override
-    protected LinkedList<PathData> expandArguments(LinkedList<String> args) 
throws IOException {
-        if(fileName == null && args.size() < 2) {
-            throw new IOException("Invalid Number of Arguments. Expected 
more");
+  @Override
+  protected void processArguments(LinkedList<PathData> args) throws 
IOException {
+    PathData basePath = args.get(0);
+    LOG.info("Deleting files under:{}", basePath);
+    List<Path> pathList = new ArrayList<>();
+    if (fileName != null) {
+      LOG.info("Reading from file:{}", fileName);
+      FileSystem localFile = FileSystem.get(getConf());
+      BufferedReader br = new BufferedReader(new InputStreamReader(
+              localFile.open(new Path(fileName)), StandardCharsets.UTF_8));
+      String line;
+      while ((line = br.readLine()) != null) {
+        if (!line.startsWith("#")) {
+          pathList.add(new Path(line));

Review Comment:
   maybe trim it? tricky one that; means you can't delete paths with trailing 
spaces



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestBulkDeleteCommand.java:
##########
@@ -1,45 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.HadoopTestBase;
+import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.LinkedList;
+public class TestBulkDeleteCommand extends HadoopTestBase {
+  private static Configuration conf;
+  private static FsShell shell;
+  private static LocalFileSystem lfs;
+  private static Path testRootDir;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+  @BeforeClass
+  public static void setup() throws IOException {
+    conf = new Configuration();
+    shell = new FsShell(conf);
+    lfs = FileSystem.getLocal(conf);
+    testRootDir = lfs.makeQualified(new Path(GenericTestUtils.getTempPath(
+            "testFsShellBulkDelete")));
+    lfs.delete(testRootDir, true);
+    lfs.mkdirs(testRootDir);
+    lfs.setWorkingDirectory(testRootDir);
+  }
 
-public class TestBulkDeleteCommand {
-    private static Configuration conf;
+  @Test
+  public void testDefaults() throws IOException {
+    LinkedList<String> options = new LinkedList<>();
+    BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf);
+    bulkDeleteCommand.processOptions(options);
+    assertTrue(bulkDeleteCommand.childArgs.isEmpty());
+  }
 
-    @BeforeClass
-    public static void setup() throws IOException {
-        conf = new Configuration();
-    }
+  @Test
+  public void testArguments() throws IOException, URISyntaxException {
+    BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf);
+    LinkedList<String> arguments = new LinkedList<>();
+    String arg1 = "file:///file/name/1";
+    String arg2 = "file:///file/name/1/2";
+    arguments.add(arg1);
+    arguments.add(arg2);
+    LinkedList<PathData> pathData = 
bulkDeleteCommand.expandArguments(arguments);
+    Assertions.assertThat(pathData.size()).

Review Comment:
   use assertJ list assertions .hasSize()



##########
hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestBulkDeleteCommand.java:
##########
@@ -1,45 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.hadoop.fs.shell;
 
+import java.io.*;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.HadoopTestBase;
+import org.assertj.core.api.Assertions;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.util.LinkedList;
+public class TestBulkDeleteCommand extends HadoopTestBase {
+  private static Configuration conf;
+  private static FsShell shell;
+  private static LocalFileSystem lfs;
+  private static Path testRootDir;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+  @BeforeClass
+  public static void setup() throws IOException {
+    conf = new Configuration();
+    shell = new FsShell(conf);
+    lfs = FileSystem.getLocal(conf);
+    testRootDir = lfs.makeQualified(new Path(GenericTestUtils.getTempPath(
+            "testFsShellBulkDelete")));
+    lfs.delete(testRootDir, true);
+    lfs.mkdirs(testRootDir);
+    lfs.setWorkingDirectory(testRootDir);
+  }
 
-public class TestBulkDeleteCommand {
-    private static Configuration conf;
+  @Test
+  public void testDefaults() throws IOException {
+    LinkedList<String> options = new LinkedList<>();
+    BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf);
+    bulkDeleteCommand.processOptions(options);
+    assertTrue(bulkDeleteCommand.childArgs.isEmpty());
+  }
 
-    @BeforeClass
-    public static void setup() throws IOException {
-        conf = new Configuration();
-    }
+  @Test
+  public void testArguments() throws IOException, URISyntaxException {
+    BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf);
+    LinkedList<String> arguments = new LinkedList<>();
+    String arg1 = "file:///file/name/1";
+    String arg2 = "file:///file/name/1/2";
+    arguments.add(arg1);
+    arguments.add(arg2);
+    LinkedList<PathData> pathData = 
bulkDeleteCommand.expandArguments(arguments);
+    Assertions.assertThat(pathData.size()).
+            describedAs("Only one root path must be present").isEqualTo(1);
+    Assertions.assertThat(pathData.get(0).path.toUri().getPath()).
+            describedAs("Base path of the command should match").isEqualTo(new 
URI(arg1).getPath());
+    Assertions.assertThat(bulkDeleteCommand.childArgs.size()).
+            describedAs("Only one other argument was passed to the command").
+            isEqualTo(1);
+    Assertions.assertThat(bulkDeleteCommand.childArgs.get(0)).
+            describedAs("Children arguments must match").isEqualTo(arg2);
+  }
 
-    @Test
-    public void testDefaults() throws IOException {
-        LinkedList<String> options = new LinkedList<>();
-        BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand();
-        bulkDeleteCommand.processOptions(options);
-        assertTrue(bulkDeleteCommand.childArgs.isEmpty());
+  @Test
+  public void testWrongArguments() throws IOException, URISyntaxException {
+    BulkDeleteCommand bulkDeleteCommand = new BulkDeleteCommand(conf);
+    LinkedList<String> arguments = new LinkedList<>();
+    String arg1 = "file:///file/name/1";
+    arguments.add(arg1);
+    Assertions.assertThatThrownBy(() -> 
bulkDeleteCommand.expandArguments(arguments)).

Review Comment:
   this is fine. though we normally use our own `LambdaTestUtils.intercept` 
which is better because
   
   * checks exception class and optional string
   * uses string value returned by the lambda expression in failure message 
(really useful!)
   * returns the exception for further analysis
   * I wrote it 🙂



##########
hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/BulkDeleteCommand.java:
##########
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.shell;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BulkDelete;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BulkDeleteCommand extends FsCommand {
+
+  public static void registerCommands(CommandFactory factory) {
+    factory.addClass(BulkDeleteCommand.class, "-bulkDelete");
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(BulkDeleteCommand.class.getName());
+
+  public static final String NAME = "bulkDelete";
+
+  /**
+   * File Name parameter to be specified at command line.
+   */
+  public static final String READ_FROM_FILE = "readFromFile";
+
+  /**
+   * Page size parameter specified at command line.
+   */
+  public static final String PAGE_SIZE = "pageSize";
+
+
+  public static final String USAGE = "-[ " + READ_FROM_FILE + "] [<file>] [" + 
PAGE_SIZE
+          + "] [<pageSize>] [<basePath> <paths>]";
+
+  public static final String DESCRIPTION = "Deletes the set of files under the 
given <path>.\n" +
+          "If a list of paths is provided at command line then the paths are 
deleted directly.\n" +
+          "User can also point to the file where the paths are listed as full 
object names using the \"fileName\"" +
+          "parameter. The presence of a file name takes precedence over the 
list of objects.\n" +
+          "Page size refers to the size of each bulk delete batch." +
+          "Users can specify the page size using \"pageSize\" command 
parameter." +
+          "Default value is 1.\n";
+
+  private String fileName;
+
+  private int pageSize;
+
+  /*
+  Making the class stateful as the PathData initialization for all args is not 
needed
+   */ LinkedList<String> childArgs;
+
+  protected BulkDeleteCommand() {
+    this.childArgs = new LinkedList<>();
+  }
+
+  protected BulkDeleteCommand(Configuration conf) {
+    super(conf);
+  }
+
+  /**
+   * Processes the command line options and initialize the variables.
+   *
+   * @param args the command line arguments
+   * @throws IOException in case of wrong arguments passed
+   */
+  @Override
+  protected void processOptions(LinkedList<String> args) throws IOException {
+    CommandFormat cf = new CommandFormat(0, Integer.MAX_VALUE);
+    cf.addOptionWithValue(READ_FROM_FILE);
+    cf.addOptionWithValue(PAGE_SIZE);
+    cf.parse(args);
+    fileName = cf.getOptValue(READ_FROM_FILE);
+    if (cf.getOptValue(PAGE_SIZE) != null) {
+      pageSize = Integer.parseInt(cf.getOptValue(PAGE_SIZE));
+    } else {
+      pageSize = 1;
+    }
+  }
+
+  /**
+   * Processes the command line arguments and stores the child arguments in a 
list.
+   *
+   * @param args strings to expand into {@link PathData} objects
+   * @return the base path of the bulk delete command.
+   * @throws IOException if the wrong number of arguments specified
+   */
+  @Override
+  protected LinkedList<PathData> expandArguments(LinkedList<String> args) 
throws IOException {
+    if (fileName == null && args.size() < 2) {
+      throw new IOException("Invalid Number of Arguments. Expected more");
+    }
+    LinkedList<PathData> pathData = new LinkedList<>();
+    pathData.add(new PathData(args.get(0), getConf()));
+    args.remove(0);
+    this.childArgs = args;
+    return pathData;
+  }
+
+  /**
+   * Deletes the objects using the bulk delete api.
+   *
+   * @param bulkDelete Bulkdelete object exposing the API
+   * @param paths      list of paths to be deleted in the base path
+   * @throws IOException on error in execution of the delete command
+   */
+  void deleteInBatches(BulkDelete bulkDelete, List<Path> paths) throws 
IOException {
+    Batch<Path> batches = new Batch<>(paths, pageSize);
+    while (batches.hasNext()) {
+      try {
+        List<Map.Entry<Path, String>> result = 
bulkDelete.bulkDelete(batches.next());
+        LOG.debug("Deleted Result:{}", result.toString());
+      } catch (IllegalArgumentException e) {
+        LOG.error("Caught exception while deleting", e);

Review Comment:
   make it "Exception while deleting"





> Implement bulk delete command as hadoop fs command operation 
> -------------------------------------------------------------
>
>                 Key: HADOOP-19254
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19254
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: fs
>    Affects Versions: 3.4.1
>            Reporter: Mukund Thakur
>            Assignee: Harshit Gupta
>            Priority: Major
>              Labels: pull-request-available
>
> {code}
> hadoop fs -bulkdelete <base-url> <file> 
> {code}
> Key uses
> * QE: Testing from python and other scripting languages
> * cluster maintenance: actual bulk deletion operations from the store
> one thought there: we MUST qualify paths with / elements: if a passed in path 
> ends in /, it means "delete a marker", not "delete a dir"'. and if it doesn't 
> have one then it's an object.. This makes it possible to be used to delete 
> surplus markers or where there is a file above another file...cloudstore 
> listobjects finds this



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to