[ 
https://issues.apache.org/jira/browse/HADOOP-19189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853042#comment-17853042
 ] 

ASF GitHub Bot commented on HADOOP-19189:
-----------------------------------------

virajjasani commented on code in PR #6857:
URL: https://github.com/apache/hadoop/pull/6857#discussion_r1630727716


##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java:
##########
@@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends 
AbstractCommitITest {
    * Parameterized list of bindings of committer name in config file to
    * expected class instantiated.
    */
-  private static final Object[][] bindings = {
-      {COMMITTER_NAME_FILE, FileOutputCommitter.class},
-      {COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
-      {COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
-      {InternalCommitterConstants.COMMITTER_NAME_STAGING,
-          StagingCommitter.class},
-      {COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
+  private static final Object[][] BINDINGS = {
+      {"", "", FileOutputCommitter.class, "Default Binding"},
+      {COMMITTER_NAME_FILE, "", FileOutputCommitter.class, "File committer in 
FS"},
+      {COMMITTER_NAME_PARTITIONED, "", PartitionedStagingCommitter.class,
+          "partitoned committer in FS"},
+      {COMMITTER_NAME_STAGING, "", StagingCommitter.class, "staging committer 
in FS"},
+      {COMMITTER_NAME_MAGIC, "", MagicS3GuardCommitter.class, "magic committer 
in FS"},
+      {COMMITTER_NAME_DIRECTORY, "", DirectoryStagingCommitter.class, "Dir 
committer in FS"},
+      {INVALID_NAME, "", null, "invalid committer in FS"},
+
+      {"", COMMITTER_NAME_FILE, FileOutputCommitter.class, "File committer in 
task"},
+      {"", COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class,
+          "partioned committer in task"},
+      {"", COMMITTER_NAME_STAGING, StagingCommitter.class, "staging committer 
in task"},
+      {"", COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class, "magic committer 
in task"},
+      {"", COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class, "Dir 
committer in task"},
+      {"", INVALID_NAME, null, "invalid committer in task"},
   };
 
   /**
-   * This is a ref to the FS conf, so changes here are visible
-   * to callers querying the FS config.
+   * Test array for parameterized test runs.
+   *
+   * @return the committer binding for this run.
    */
-  private Configuration filesystemConfRef;
-
-  private Configuration taskConfRef;
+  @Parameterized.Parameters(name = "{3}-fs=[{0}]-task=[{1}]-[{2}]")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(BINDINGS);
+  }
 
-  @Override
-  public void setup() throws Exception {
-    super.setup();
-    jobId = randomJobId();
-    attempt0 = "attempt_" + jobId + "_m_000000_0";
-    taskAttempt0 = TaskAttemptID.forName(attempt0);
+  /**
+   * Name of committer to set in filesystem config. If "" do not set one.
+   */
+  private final String fsCommitterName;
 
-    outDir = path(getMethodName());
-    factory = new S3ACommitterFactory();
-    Configuration conf = new Configuration();
-    conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
-    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
-    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
-    filesystemConfRef = getFileSystem().getConf();
-    tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
-    taskConfRef = tContext.getConfiguration();
-  }
+  /**
+   * Name of committer to set in job config.
+   */
+  private final String jobCommitterName;
 
-  @Test
-  public void testEverything() throws Throwable {
-    testImplicitFileBinding();
-    testBindingsInTask();
-    testBindingsInFSConfig();
-    testInvalidFileBinding();
-    testInvalidTaskBinding();
-  }
+  /**
+   * Expected committer class.
+   * If null: an exception is expected
+   */
+  private final Class<? extends AbstractS3ACommitter> committerClass;
 
   /**
-   * Verify that if all config options are unset, the FileOutputCommitter
-   *
-   * is returned.
+   * Description from parameters, simply for thread names to be more 
informative.
    */
-  public void testImplicitFileBinding() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-  }
+  private final String description;
 
   /**
-   * Verify that task bindings are picked up.
+   * Create a parameterized instance.
+   * @param fsCommitterName committer to set in filesystem config
+   * @param jobCommitterName committer to set in job config
+   * @param committerClass expected committer class
+   * @param description debug text for thread names.
    */
-  public void testBindingsInTask() throws Throwable {
-    // set this to an invalid value to be confident it is not
-    // being checked.
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
-    taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-    for (Object[] binding : bindings) {
-      taskConfRef.set(FS_S3A_COMMITTER_NAME,
-          (String) binding[0]);
-      assertFactoryCreatesExpectedCommitter((Class) binding[1]);
-    }
+  public ITestS3ACommitterFactory(
+      final String fsCommitterName,
+      final String jobCommitterName,
+      final Class<? extends AbstractS3ACommitter> committerClass,
+      final String description) {
+    this.fsCommitterName = fsCommitterName;
+    this.jobCommitterName = jobCommitterName;
+    this.committerClass = committerClass;
+    this.description = description;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    // do not cache, because we want the committer one to pick up
+    // the fs with fs-specific configuration
+    conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false);
+    removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_NAME);
+    maybeSetCommitterName(conf, fsCommitterName);
+    return conf;
   }
 
   /**
-   * Verify that FS bindings are picked up.
+   * Set a committer name in a configuration.
+   * @param conf configuration to patch.
+   * @param name name. If "" the option is unset.
    */
-  public void testBindingsInFSConfig() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-    for (Object[] binding : bindings) {
-      taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
-      assertFactoryCreatesExpectedCommitter((Class) binding[1]);
+  private static void maybeSetCommitterName(final Configuration conf, final 
String name) {
+    if (!name.isEmpty()) {
+      conf.set(FS_S3A_COMMITTER_NAME, name);
+    } else {
+      conf.unset(FS_S3A_COMMITTER_NAME);
     }
   }
 
-  /**
-   * Create an invalid committer via the FS binding.
-   */
-  public void testInvalidFileBinding() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
-    LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
-        () -> createCommitter());
+  @Override
+  public void setup() throws Exception {
+    // destroy all filesystems from previous runs.
+    FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
+    super.setup();
+    jobId = randomJobId();
+    attempt0 = "attempt_" + jobId + "_m_000000_0";
+    taskAttempt0 = TaskAttemptID.forName(attempt0);
+
+    outDir = methodPath();
+    factory = new S3ACommitterFactory();
+    final Configuration fsConf = getConfiguration();
+    JobConf jobConf = new JobConf(fsConf);
+    jobConf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    maybeSetCommitterName(jobConf, jobCommitterName);
+    tContext = new TaskAttemptContextImpl(jobConf, taskAttempt0);
+
+    LOG.info("Filesystem Committer='{}'; task='{}'",
+        fsConf.get(FS_S3A_COMMITTER_NAME),
+        jobConf.get(FS_S3A_COMMITTER_NAME));

Review Comment:
   we can also log `description` here for better debugging?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java:
##########
@@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends 
AbstractCommitITest {
    * Parameterized list of bindings of committer name in config file to
    * expected class instantiated.
    */
-  private static final Object[][] bindings = {
-      {COMMITTER_NAME_FILE, FileOutputCommitter.class},
-      {COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
-      {COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
-      {InternalCommitterConstants.COMMITTER_NAME_STAGING,
-          StagingCommitter.class},
-      {COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
+  private static final Object[][] BINDINGS = {
+      {"", "", FileOutputCommitter.class, "Default Binding"},
+      {COMMITTER_NAME_FILE, "", FileOutputCommitter.class, "File committer in 
FS"},
+      {COMMITTER_NAME_PARTITIONED, "", PartitionedStagingCommitter.class,
+          "partitoned committer in FS"},
+      {COMMITTER_NAME_STAGING, "", StagingCommitter.class, "staging committer 
in FS"},
+      {COMMITTER_NAME_MAGIC, "", MagicS3GuardCommitter.class, "magic committer 
in FS"},
+      {COMMITTER_NAME_DIRECTORY, "", DirectoryStagingCommitter.class, "Dir 
committer in FS"},
+      {INVALID_NAME, "", null, "invalid committer in FS"},
+
+      {"", COMMITTER_NAME_FILE, FileOutputCommitter.class, "File committer in 
task"},
+      {"", COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class,
+          "partioned committer in task"},
+      {"", COMMITTER_NAME_STAGING, StagingCommitter.class, "staging committer 
in task"},
+      {"", COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class, "magic committer 
in task"},
+      {"", COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class, "Dir 
committer in task"},
+      {"", INVALID_NAME, null, "invalid committer in task"},
   };
 
   /**
-   * This is a ref to the FS conf, so changes here are visible
-   * to callers querying the FS config.
+   * Test array for parameterized test runs.
+   *
+   * @return the committer binding for this run.
    */
-  private Configuration filesystemConfRef;
-
-  private Configuration taskConfRef;
+  @Parameterized.Parameters(name = "{3}-fs=[{0}]-task=[{1}]-[{2}]")
+  public static Collection<Object[]> params() {
+    return Arrays.asList(BINDINGS);
+  }
 
-  @Override
-  public void setup() throws Exception {
-    super.setup();
-    jobId = randomJobId();
-    attempt0 = "attempt_" + jobId + "_m_000000_0";
-    taskAttempt0 = TaskAttemptID.forName(attempt0);
+  /**
+   * Name of committer to set in filesystem config. If "" do not set one.
+   */
+  private final String fsCommitterName;
 
-    outDir = path(getMethodName());
-    factory = new S3ACommitterFactory();
-    Configuration conf = new Configuration();
-    conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
-    conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
-    conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
-    filesystemConfRef = getFileSystem().getConf();
-    tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
-    taskConfRef = tContext.getConfiguration();
-  }
+  /**
+   * Name of committer to set in job config.
+   */
+  private final String jobCommitterName;
 
-  @Test
-  public void testEverything() throws Throwable {
-    testImplicitFileBinding();
-    testBindingsInTask();
-    testBindingsInFSConfig();
-    testInvalidFileBinding();
-    testInvalidTaskBinding();
-  }
+  /**
+   * Expected committer class.
+   * If null: an exception is expected
+   */
+  private final Class<? extends AbstractS3ACommitter> committerClass;
 
   /**
-   * Verify that if all config options are unset, the FileOutputCommitter
-   *
-   * is returned.
+   * Description from parameters, simply for thread names to be more 
informative.
    */
-  public void testImplicitFileBinding() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-  }
+  private final String description;
 
   /**
-   * Verify that task bindings are picked up.
+   * Create a parameterized instance.
+   * @param fsCommitterName committer to set in filesystem config
+   * @param jobCommitterName committer to set in job config
+   * @param committerClass expected committer class
+   * @param description debug text for thread names.
    */
-  public void testBindingsInTask() throws Throwable {
-    // set this to an invalid value to be confident it is not
-    // being checked.
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
-    taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-    for (Object[] binding : bindings) {
-      taskConfRef.set(FS_S3A_COMMITTER_NAME,
-          (String) binding[0]);
-      assertFactoryCreatesExpectedCommitter((Class) binding[1]);
-    }
+  public ITestS3ACommitterFactory(
+      final String fsCommitterName,
+      final String jobCommitterName,
+      final Class<? extends AbstractS3ACommitter> committerClass,
+      final String description) {
+    this.fsCommitterName = fsCommitterName;
+    this.jobCommitterName = jobCommitterName;
+    this.committerClass = committerClass;
+    this.description = description;
+  }
+
+  @Override
+  protected Configuration createConfiguration() {
+    final Configuration conf = super.createConfiguration();
+    // do not cache, because we want the committer one to pick up
+    // the fs with fs-specific configuration
+    conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false);
+    removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_NAME);
+    maybeSetCommitterName(conf, fsCommitterName);
+    return conf;
   }
 
   /**
-   * Verify that FS bindings are picked up.
+   * Set a committer name in a configuration.
+   * @param conf configuration to patch.
+   * @param name name. If "" the option is unset.
    */
-  public void testBindingsInFSConfig() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
-    assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
-    for (Object[] binding : bindings) {
-      taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
-      assertFactoryCreatesExpectedCommitter((Class) binding[1]);
+  private static void maybeSetCommitterName(final Configuration conf, final 
String name) {
+    if (!name.isEmpty()) {
+      conf.set(FS_S3A_COMMITTER_NAME, name);
+    } else {
+      conf.unset(FS_S3A_COMMITTER_NAME);
     }
   }
 
-  /**
-   * Create an invalid committer via the FS binding.
-   */
-  public void testInvalidFileBinding() throws Throwable {
-    taskConfRef.unset(FS_S3A_COMMITTER_NAME);
-    filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
-    LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
-        () -> createCommitter());
+  @Override
+  public void setup() throws Exception {
+    // destroy all filesystems from previous runs.
+    FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
+    super.setup();
+    jobId = randomJobId();
+    attempt0 = "attempt_" + jobId + "_m_000000_0";
+    taskAttempt0 = TaskAttemptID.forName(attempt0);
+
+    outDir = methodPath();
+    factory = new S3ACommitterFactory();
+    final Configuration fsConf = getConfiguration();
+    JobConf jobConf = new JobConf(fsConf);
+    jobConf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
+    jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+    jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+    maybeSetCommitterName(jobConf, jobCommitterName);
+    tContext = new TaskAttemptContextImpl(jobConf, taskAttempt0);
+
+    LOG.info("Filesystem Committer='{}'; task='{}'",
+        fsConf.get(FS_S3A_COMMITTER_NAME),
+        jobConf.get(FS_S3A_COMMITTER_NAME));
+  }
+
+
+  @Override
+  protected void deleteTestDirInTeardown() {
+    // no-op
   }
 
   /**
-   * Create an invalid committer via the task attempt.
+   * Verify that if all config options are unset, the FileOutputCommitter
+   * is returned.
    */
-  public void testInvalidTaskBinding() throws Throwable {
-    filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
-    taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
-    LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
-        () -> createCommitter());
+  @Test
+  public void testBinding() throws Throwable {
+    assertFactoryCreatesExpectedCommitter(committerClass);
   }
 
   /**
    * Assert that the factory creates the expected committer.
+   * If a null committer is passed in, a {@link PathIOException}
+   * is expected.
    * @param expected expected committer class.
-   * @throws IOException IO failure.
+   * @throws Exception IO failure.
    */
-  protected void assertFactoryCreatesExpectedCommitter(
+  private void assertFactoryCreatesExpectedCommitter(
       final Class expected)
-      throws IOException {
-    assertEquals("Wrong Committer from factory",
-        expected,
-        createCommitter().getClass());
+      throws Exception {
+    describe("Creating committer: expected class \"%s\"", expected);
+    if (expected != null) {
+      assertEquals("Wrong Committer from factory",
+          expected,
+          createCommitter().getClass());
+    } else {
+      intercept(PathCommitException.class, () ->
+          createCommitter());

Review Comment:
   nit: reference `this::createCommitter` instead of lambda?





> ITestS3ACommitterFactory failing
> --------------------------------
>
>                 Key: HADOOP-19189
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19189
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3, test
>    Affects Versions: 3.4.0
>            Reporter: Steve Loughran
>            Priority: Minor
>              Labels: pull-request-available
>
> we've had ITestS3ACommitterFactory failing for a while, where it looks like 
> changed committer settings aren't being picked up.
> {code}
> ERROR] 
> ITestS3ACommitterFactory.testEverything:115->testInvalidFileBinding:165 
> Expected a org.apache.hadoop.fs.s3a.commit.PathCommitException to be thrown, 
> but got the result: : 
> FileOutputCommitter{PathOutputCommitter{context=TaskAttemptContextImpl{JobContextImpl
> {code}
> I've spent some time looking at it and it is happening because the test sets 
> the fileystem ref for the local test fs, and not that of the filesystem 
> created by the committer, which is where the option is picked up.
> i've tried to parameterize it but things are still playing up and I'm not 
> sure how hard to try to fix.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to