virajjasani commented on code in PR #6857:
URL: https://github.com/apache/hadoop/pull/6857#discussion_r1630727716


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java:
##########
@@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends 
AbstractCommitITest {
    * Parameterized list of bindings of committer name in config file to
    * expected class instantiated.
    */
-  private static final Object[][] bindings = {
-      {COMMITTER_NAME_FILE, FileOutputCommitter.class},
-      {COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
-      {COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
-      {InternalCommitterConstants.COMMITTER_NAME_STAGING,
-          StagingCommitter.class},
-      {COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
+  private static final Object[][] BINDINGS = {
+      {"", "", FileOutputCommitter.class, "Default Binding"},
+      {COMMITTER_NAME_FILE, "", FileOutputCommitter.class, "File committer in 
FS"},
+      {COMMITTER_NAME_PARTITIONED, "", PartitionedStagingCommitter.class,
+          "partitoned committer in FS"},
+      {COMMITTER_NAME_STAGING, "", StagingCommitter.class, "staging committer 
in FS"},
+      {COMMITTER_NAME_MAGIC, "", MagicS3GuardCommitter.class, "magic committer 
in FS"},
+      {COMMITTER_NAME_DIRECTORY, "", DirectoryStagingCommitter.class, "Dir 
committer in FS"},
+      {INVALID_NAME, "", null, "invalid committer in FS"},
+
+      {"", COMMITTER_NAME_FILE, FileOutputCommitter.class, "File committer in 
task"},
+      {"", COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class,
+          "partioned committer in task"},
+      {"", COMMITTER_NAME_STAGING, StagingCommitter.class, "staging committer 
in task"},
+      {"", COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class, "magic committer 
in task"},
+      {"", COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class, "Dir 
committer in task"},
+      {"", INVALID_NAME, null, "invalid committer in task"},
   };
 
   /**
-   * This is a ref to the FS conf, so changes here are visible
-   * to callers querying the FS config.
+   * Test array for parameterized test runs.
+   *
+   * @return the committer binding for this run.
    */
-  private Configuration filesystemConfRef;
-
-  private Configuration taskConfRef;
+  @Parameterized.Parameters(name = "{3}-fs=[{0}]-task=[{1}]-[{2}]")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(BINDINGS);
+  }
 
-  @Override
-  public void setup() throws Exception {
-    super.setup();
-    jobId = randomJobId();
-    attempt0 = "attempt_" + jobId + "_m_000000_0";
-    taskAttempt0 = TaskAttemptID.forName(attempt0);
+  /**
+   * Name of committer to set in filesystem config. If "" do not set one.
+   */
+  private final String fsCommitterName;
 
-    outDir = path(getMethodName());
-    factory = new S3ACommitterFactory();
-    Configuration conf = new Configuration();
-    conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
-    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
-    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
-    filesystemConfRef = getFileSystem().getConf();
-    tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
-    taskConfRef = tContext.getConfiguration();
-  }
+  /**
+   * Name of committer to set in job config.
+   */
+  private final String jobCommitterName;
 
-  @Test
-  public void testEverything() throws Throwable {
-    testImplicitFileBinding();
-    testBindingsInTask();
-    testBindingsInFSConfig();
-    testInvalidFileBinding();
-    testInvalidTaskBinding();
-  }
+  /**
+   * Expected committer class.
+   * If null: an exception is expected
+   */
+  private final Class<? extends AbstractS3ACommitter> committerClass;
 
   /**
-   * Verify that if all config options are unset, the FileOutputCommitter
-   *
-   * is returned.
+   * Description from parameters, simply for thread names to be more 
informative.
    */
-  public void testImplicitFileBinding() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-  }
+  private final String description;
 
   /**
-   * Verify that task bindings are picked up.
+   * Create a parameterized instance.
+   * @param fsCommitterName committer to set in filesystem config
+   * @param jobCommitterName committer to set in job config
+   * @param committerClass expected committer class
+   * @param description debug text for thread names.
    */
-  public void testBindingsInTask() throws Throwable {
-    // set this to an invalid value to be confident it is not
-    // being checked.
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
-    taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-    for (Object[] binding : bindings) {
-      taskConfRef.set(FS_S3A_COMMITTER_NAME,
-          (String) binding[0]);
-      assertFactoryCreatesExpectedCommitter((Class) binding[1]);
-    }
+  public ITestS3ACommitterFactory(
+      final String fsCommitterName,
+      final String jobCommitterName,
+      final Class<? extends AbstractS3ACommitter> committerClass,
+      final String description) {
+    this.fsCommitterName = fsCommitterName;
+    this.jobCommitterName = jobCommitterName;
+    this.committerClass = committerClass;
+    this.description = description;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    // do not cache, because we want the committer one to pick up
+    // the fs with fs-specific configuration
+    conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false);
+    removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_NAME);
+    maybeSetCommitterName(conf, fsCommitterName);
+    return conf;
   }
 
   /**
-   * Verify that FS bindings are picked up.
+   * Set a committer name in a configuration.
+   * @param conf configuration to patch.
+   * @param name name. If "" the option is unset.
    */
-  public void testBindingsInFSConfig() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-    for (Object[] binding : bindings) {
-      taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
-      assertFactoryCreatesExpectedCommitter((Class) binding[1]);
+  private static void maybeSetCommitterName(final Configuration conf, final 
String name) {
+    if (!name.isEmpty()) {
+      conf.set(FS_S3A_COMMITTER_NAME, name);
+    } else {
+      conf.unset(FS_S3A_COMMITTER_NAME);
     }
   }
 
-  /**
-   * Create an invalid committer via the FS binding.
-   */
-  public void testInvalidFileBinding() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
-    LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
-        () -> createCommitter());
+  @Override
+  public void setup() throws Exception {
+    // destroy all filesystems from previous runs.
+    FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
+    super.setup();
+    jobId = randomJobId();
+    attempt0 = "attempt_" + jobId + "_m_000000_0";
+    taskAttempt0 = TaskAttemptID.forName(attempt0);
+
+    outDir = methodPath();
+    factory = new S3ACommitterFactory();
+    final Configuration fsConf = getConfiguration();
+    JobConf jobConf = new JobConf(fsConf);
+    jobConf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    maybeSetCommitterName(jobConf, jobCommitterName);
+    tContext = new TaskAttemptContextImpl(jobConf, taskAttempt0);
+
+    LOG.info("Filesystem Committer='{}'; task='{}'",
+        fsConf.get(FS_S3A_COMMITTER_NAME),
+        jobConf.get(FS_S3A_COMMITTER_NAME));

Review Comment:
   we can also log `description` here for better debugging?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java:
##########
@@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends 
AbstractCommitITest {
    * Parameterized list of bindings of committer name in config file to
    * expected class instantiated.
    */
-  private static final Object[][] bindings = {
-      {COMMITTER_NAME_FILE, FileOutputCommitter.class},
-      {COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
-      {COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
-      {InternalCommitterConstants.COMMITTER_NAME_STAGING,
-          StagingCommitter.class},
-      {COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
+  private static final Object[][] BINDINGS = {
+      {"", "", FileOutputCommitter.class, "Default Binding"},
+      {COMMITTER_NAME_FILE, "", FileOutputCommitter.class, "File committer in 
FS"},
+      {COMMITTER_NAME_PARTITIONED, "", PartitionedStagingCommitter.class,
+          "partitoned committer in FS"},
+      {COMMITTER_NAME_STAGING, "", StagingCommitter.class, "staging committer 
in FS"},
+      {COMMITTER_NAME_MAGIC, "", MagicS3GuardCommitter.class, "magic committer 
in FS"},
+      {COMMITTER_NAME_DIRECTORY, "", DirectoryStagingCommitter.class, "Dir 
committer in FS"},
+      {INVALID_NAME, "", null, "invalid committer in FS"},
+
+      {"", COMMITTER_NAME_FILE, FileOutputCommitter.class, "File committer in 
task"},
+      {"", COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class,
+          "partioned committer in task"},
+      {"", COMMITTER_NAME_STAGING, StagingCommitter.class, "staging committer 
in task"},
+      {"", COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class, "magic committer 
in task"},
+      {"", COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class, "Dir 
committer in task"},
+      {"", INVALID_NAME, null, "invalid committer in task"},
   };
 
   /**
-   * This is a ref to the FS conf, so changes here are visible
-   * to callers querying the FS config.
+   * Test array for parameterized test runs.
+   *
+   * @return the committer binding for this run.
    */
-  private Configuration filesystemConfRef;
-
-  private Configuration taskConfRef;
+  @Parameterized.Parameters(name = "{3}-fs=[{0}]-task=[{1}]-[{2}]")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(BINDINGS);
+  }
 
-  @Override
-  public void setup() throws Exception {
-    super.setup();
-    jobId = randomJobId();
-    attempt0 = "attempt_" + jobId + "_m_000000_0";
-    taskAttempt0 = TaskAttemptID.forName(attempt0);
+  /**
+   * Name of committer to set in filesystem config. If "" do not set one.
+   */
+  private final String fsCommitterName;
 
-    outDir = path(getMethodName());
-    factory = new S3ACommitterFactory();
-    Configuration conf = new Configuration();
-    conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
-    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
-    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
-    filesystemConfRef = getFileSystem().getConf();
-    tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
-    taskConfRef = tContext.getConfiguration();
-  }
+  /**
+   * Name of committer to set in job config.
+   */
+  private final String jobCommitterName;
 
-  @Test
-  public void testEverything() throws Throwable {
-    testImplicitFileBinding();
-    testBindingsInTask();
-    testBindingsInFSConfig();
-    testInvalidFileBinding();
-    testInvalidTaskBinding();
-  }
+  /**
+   * Expected committer class.
+   * If null: an exception is expected
+   */
+  private final Class<? extends AbstractS3ACommitter> committerClass;
 
   /**
-   * Verify that if all config options are unset, the FileOutputCommitter
-   *
-   * is returned.
+   * Description from parameters, simply for thread names to be more 
informative.
    */
-  public void testImplicitFileBinding() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-  }
+  private final String description;
 
   /**
-   * Verify that task bindings are picked up.
+   * Create a parameterized instance.
+   * @param fsCommitterName committer to set in filesystem config
+   * @param jobCommitterName committer to set in job config
+   * @param committerClass expected committer class
+   * @param description debug text for thread names.
    */
-  public void testBindingsInTask() throws Throwable {
-    // set this to an invalid value to be confident it is not
-    // being checked.
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
-    taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-    for (Object[] binding : bindings) {
-      taskConfRef.set(FS_S3A_COMMITTER_NAME,
-          (String) binding[0]);
-      assertFactoryCreatesExpectedCommitter((Class) binding[1]);
-    }
+  public ITestS3ACommitterFactory(
+      final String fsCommitterName,
+      final String jobCommitterName,
+      final Class<? extends AbstractS3ACommitter> committerClass,
+      final String description) {
+    this.fsCommitterName = fsCommitterName;
+    this.jobCommitterName = jobCommitterName;
+    this.committerClass = committerClass;
+    this.description = description;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    // do not cache, because we want the committer one to pick up
+    // the fs with fs-specific configuration
+    conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false);
+    removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_NAME);
+    maybeSetCommitterName(conf, fsCommitterName);
+    return conf;
   }
 
   /**
-   * Verify that FS bindings are picked up.
+   * Set a committer name in a configuration.
+   * @param conf configuration to patch.
+   * @param name name. If "" the option is unset.
    */
-  public void testBindingsInFSConfig() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-    for (Object[] binding : bindings) {
-      taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
-      assertFactoryCreatesExpectedCommitter((Class) binding[1]);
+  private static void maybeSetCommitterName(final Configuration conf, final 
String name) {
+    if (!name.isEmpty()) {
+      conf.set(FS_S3A_COMMITTER_NAME, name);
+    } else {
+      conf.unset(FS_S3A_COMMITTER_NAME);
     }
   }
 
-  /**
-   * Create an invalid committer via the FS binding.
-   */
-  public void testInvalidFileBinding() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
-    LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
-        () -> createCommitter());
+  @Override
+  public void setup() throws Exception {
+    // destroy all filesystems from previous runs.
+    FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
+    super.setup();
+    jobId = randomJobId();
+    attempt0 = "attempt_" + jobId + "_m_000000_0";
+    taskAttempt0 = TaskAttemptID.forName(attempt0);
+
+    outDir = methodPath();
+    factory = new S3ACommitterFactory();
+    final Configuration fsConf = getConfiguration();
+    JobConf jobConf = new JobConf(fsConf);
+    jobConf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    maybeSetCommitterName(jobConf, jobCommitterName);
+    tContext = new TaskAttemptContextImpl(jobConf, taskAttempt0);
+
+    LOG.info("Filesystem Committer='{}'; task='{}'",
+        fsConf.get(FS_S3A_COMMITTER_NAME),
+        jobConf.get(FS_S3A_COMMITTER_NAME));
+  }
+
+
+  @Override
+  protected void deleteTestDirInTeardown() {
+    // no-op
   }
 
   /**
-   * Create an invalid committer via the task attempt.
+   * Verify that if all config options are unset, the FileOutputCommitter
+   * is returned.
    */
-  public void testInvalidTaskBinding() throws Throwable {
-    filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
-    taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
-    LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
-        () -> createCommitter());
+  @Test
+  public void testBinding() throws Throwable {
+    assertFactoryCreatesExpectedCommitter(committerClass);
   }
 
   /**
    * Assert that the factory creates the expected committer.
+   * If a null committer is passed in, a {@link PathIOException}
+   * is expected.
    * @param expected expected committer class.
-   * @throws IOException IO failure.
+   * @throws Exception IO failure.
    */
-  protected void assertFactoryCreatesExpectedCommitter(
+  private void assertFactoryCreatesExpectedCommitter(
       final Class expected)
-      throws IOException {
-    assertEquals("Wrong Committer from factory",
-        expected,
-        createCommitter().getClass());
+      throws Exception {
+    describe("Creating committer: expected class \"%s\"", expected);
+    if (expected != null) {
+      assertEquals("Wrong Committer from factory",
+          expected,
+          createCommitter().getClass());
+    } else {
+      intercept(PathCommitException.class, () ->
+          createCommitter());

Review Comment:
   nit: reference `this::createCommitter` instead of lambda?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to