[
https://issues.apache.org/jira/browse/HADOOP-19189?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853042#comment-17853042
]
ASF GitHub Bot commented on HADOOP-19189:
-----------------------------------------
virajjasani commented on code in PR #6857:
URL: https://github.com/apache/hadoop/pull/6857#discussion_r1630727716
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java:
##########
@@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends
AbstractCommitITest {
* Parameterized list of bindings of committer name in config file to
* expected class instantiated.
*/
- private static final Object[][] bindings = {
- {COMMITTER_NAME_FILE, FileOutputCommitter.class},
- {COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
- {COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
- {InternalCommitterConstants.COMMITTER_NAME_STAGING,
- StagingCommitter.class},
- {COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
+ private static final Object[][] BINDINGS = {
+ {"", "", FileOutputCommitter.class, "Default Binding"},
+ {COMMITTER_NAME_FILE, "", FileOutputCommitter.class, "File committer in
FS"},
+ {COMMITTER_NAME_PARTITIONED, "", PartitionedStagingCommitter.class,
+ "partitoned committer in FS"},
+ {COMMITTER_NAME_STAGING, "", StagingCommitter.class, "staging committer
in FS"},
+ {COMMITTER_NAME_MAGIC, "", MagicS3GuardCommitter.class, "magic committer
in FS"},
+ {COMMITTER_NAME_DIRECTORY, "", DirectoryStagingCommitter.class, "Dir
committer in FS"},
+ {INVALID_NAME, "", null, "invalid committer in FS"},
+
+ {"", COMMITTER_NAME_FILE, FileOutputCommitter.class, "File committer in
task"},
+ {"", COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class,
+ "partioned committer in task"},
+ {"", COMMITTER_NAME_STAGING, StagingCommitter.class, "staging committer
in task"},
+ {"", COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class, "magic committer
in task"},
+ {"", COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class, "Dir
committer in task"},
+ {"", INVALID_NAME, null, "invalid committer in task"},
};
/**
- * This is a ref to the FS conf, so changes here are visible
- * to callers querying the FS config.
+ * Test array for parameterized test runs.
+ *
+ * @return the committer binding for this run.
*/
- private Configuration filesystemConfRef;
-
- private Configuration taskConfRef;
+ @Parameterized.Parameters(name = "{3}-fs=[{0}]-task=[{1}]-[{2}]")
+ public static Collection<Object[]> params() {
+ return Arrays.asList(BINDINGS);
+ }
- @Override
- public void setup() throws Exception {
- super.setup();
- jobId = randomJobId();
- attempt0 = "attempt_" + jobId + "_m_000000_0";
- taskAttempt0 = TaskAttemptID.forName(attempt0);
+ /**
+ * Name of committer to set in filesystem config. If "" do not set one.
+ */
+ private final String fsCommitterName;
- outDir = path(getMethodName());
- factory = new S3ACommitterFactory();
- Configuration conf = new Configuration();
- conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
- conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
- conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
- filesystemConfRef = getFileSystem().getConf();
- tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
- taskConfRef = tContext.getConfiguration();
- }
+ /**
+ * Name of committer to set in job config.
+ */
+ private final String jobCommitterName;
- @Test
- public void testEverything() throws Throwable {
- testImplicitFileBinding();
- testBindingsInTask();
- testBindingsInFSConfig();
- testInvalidFileBinding();
- testInvalidTaskBinding();
- }
+ /**
+ * Expected committer class.
+ * If null: an exception is expected
+ */
+ private final Class<? extends AbstractS3ACommitter> committerClass;
/**
- * Verify that if all config options are unset, the FileOutputCommitter
- *
- * is returned.
+ * Description from parameters, simply for thread names to be more
informative.
*/
- public void testImplicitFileBinding() throws Throwable {
- taskConfRef.unset(FS_S3A_COMMITTER_NAME);
- filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
- assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
- }
+ private final String description;
/**
- * Verify that task bindings are picked up.
+ * Create a parameterized instance.
+ * @param fsCommitterName committer to set in filesystem config
+ * @param jobCommitterName committer to set in job config
+ * @param committerClass expected committer class
+ * @param description debug text for thread names.
*/
- public void testBindingsInTask() throws Throwable {
- // set this to an invalid value to be confident it is not
- // being checked.
- filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
- taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
- assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
- for (Object[] binding : bindings) {
- taskConfRef.set(FS_S3A_COMMITTER_NAME,
- (String) binding[0]);
- assertFactoryCreatesExpectedCommitter((Class) binding[1]);
- }
+ public ITestS3ACommitterFactory(
+ final String fsCommitterName,
+ final String jobCommitterName,
+ final Class<? extends AbstractS3ACommitter> committerClass,
+ final String description) {
+ this.fsCommitterName = fsCommitterName;
+ this.jobCommitterName = jobCommitterName;
+ this.committerClass = committerClass;
+ this.description = description;
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ final Configuration conf = super.createConfiguration();
+ // do not cache, because we want the committer one to pick up
+ // the fs with fs-specific configuration
+ conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false);
+ removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_NAME);
+ maybeSetCommitterName(conf, fsCommitterName);
+ return conf;
}
/**
- * Verify that FS bindings are picked up.
+ * Set a committer name in a configuration.
+ * @param conf configuration to patch.
+ * @param name name. If "" the option is unset.
*/
- public void testBindingsInFSConfig() throws Throwable {
- taskConfRef.unset(FS_S3A_COMMITTER_NAME);
- filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
- assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
- for (Object[] binding : bindings) {
- taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
- assertFactoryCreatesExpectedCommitter((Class) binding[1]);
+ private static void maybeSetCommitterName(final Configuration conf, final
String name) {
+ if (!name.isEmpty()) {
+ conf.set(FS_S3A_COMMITTER_NAME, name);
+ } else {
+ conf.unset(FS_S3A_COMMITTER_NAME);
}
}
- /**
- * Create an invalid committer via the FS binding.
- */
- public void testInvalidFileBinding() throws Throwable {
- taskConfRef.unset(FS_S3A_COMMITTER_NAME);
- filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
- LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
- () -> createCommitter());
+ @Override
+ public void setup() throws Exception {
+ // destroy all filesystems from previous runs.
+ FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
+ super.setup();
+ jobId = randomJobId();
+ attempt0 = "attempt_" + jobId + "_m_000000_0";
+ taskAttempt0 = TaskAttemptID.forName(attempt0);
+
+ outDir = methodPath();
+ factory = new S3ACommitterFactory();
+ final Configuration fsConf = getConfiguration();
+ JobConf jobConf = new JobConf(fsConf);
+ jobConf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
+ jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+ jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+ maybeSetCommitterName(jobConf, jobCommitterName);
+ tContext = new TaskAttemptContextImpl(jobConf, taskAttempt0);
+
+ LOG.info("Filesystem Committer='{}'; task='{}'",
+ fsConf.get(FS_S3A_COMMITTER_NAME),
+ jobConf.get(FS_S3A_COMMITTER_NAME));
Review Comment:
we can also log `description` here for better debugging?
##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/ITestS3ACommitterFactory.java:
##########
@@ -72,121 +85,156 @@ public class ITestS3ACommitterFactory extends
AbstractCommitITest {
* Parameterized list of bindings of committer name in config file to
* expected class instantiated.
*/
- private static final Object[][] bindings = {
- {COMMITTER_NAME_FILE, FileOutputCommitter.class},
- {COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class},
- {COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class},
- {InternalCommitterConstants.COMMITTER_NAME_STAGING,
- StagingCommitter.class},
- {COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class}
+ private static final Object[][] BINDINGS = {
+ {"", "", FileOutputCommitter.class, "Default Binding"},
+ {COMMITTER_NAME_FILE, "", FileOutputCommitter.class, "File committer in
FS"},
+ {COMMITTER_NAME_PARTITIONED, "", PartitionedStagingCommitter.class,
+ "partitoned committer in FS"},
+ {COMMITTER_NAME_STAGING, "", StagingCommitter.class, "staging committer
in FS"},
+ {COMMITTER_NAME_MAGIC, "", MagicS3GuardCommitter.class, "magic committer
in FS"},
+ {COMMITTER_NAME_DIRECTORY, "", DirectoryStagingCommitter.class, "Dir
committer in FS"},
+ {INVALID_NAME, "", null, "invalid committer in FS"},
+
+ {"", COMMITTER_NAME_FILE, FileOutputCommitter.class, "File committer in
task"},
+ {"", COMMITTER_NAME_PARTITIONED, PartitionedStagingCommitter.class,
+ "partioned committer in task"},
+ {"", COMMITTER_NAME_STAGING, StagingCommitter.class, "staging committer
in task"},
+ {"", COMMITTER_NAME_MAGIC, MagicS3GuardCommitter.class, "magic committer
in task"},
+ {"", COMMITTER_NAME_DIRECTORY, DirectoryStagingCommitter.class, "Dir
committer in task"},
+ {"", INVALID_NAME, null, "invalid committer in task"},
};
/**
- * This is a ref to the FS conf, so changes here are visible
- * to callers querying the FS config.
+ * Test array for parameterized test runs.
+ *
+ * @return the committer binding for this run.
*/
- private Configuration filesystemConfRef;
-
- private Configuration taskConfRef;
+ @Parameterized.Parameters(name = "{3}-fs=[{0}]-task=[{1}]-[{2}]")
+ public static Collection<Object[]> params() {
+ return Arrays.asList(BINDINGS);
+ }
- @Override
- public void setup() throws Exception {
- super.setup();
- jobId = randomJobId();
- attempt0 = "attempt_" + jobId + "_m_000000_0";
- taskAttempt0 = TaskAttemptID.forName(attempt0);
+ /**
+ * Name of committer to set in filesystem config. If "" do not set one.
+ */
+ private final String fsCommitterName;
- outDir = path(getMethodName());
- factory = new S3ACommitterFactory();
- Configuration conf = new Configuration();
- conf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
- conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
- conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
- filesystemConfRef = getFileSystem().getConf();
- tContext = new TaskAttemptContextImpl(conf, taskAttempt0);
- taskConfRef = tContext.getConfiguration();
- }
+ /**
+ * Name of committer to set in job config.
+ */
+ private final String jobCommitterName;
- @Test
- public void testEverything() throws Throwable {
- testImplicitFileBinding();
- testBindingsInTask();
- testBindingsInFSConfig();
- testInvalidFileBinding();
- testInvalidTaskBinding();
- }
+ /**
+ * Expected committer class.
+ * If null: an exception is expected
+ */
+ private final Class<? extends AbstractS3ACommitter> committerClass;
/**
- * Verify that if all config options are unset, the FileOutputCommitter
- *
- * is returned.
+ * Description from parameters, simply for thread names to be more
informative.
*/
- public void testImplicitFileBinding() throws Throwable {
- taskConfRef.unset(FS_S3A_COMMITTER_NAME);
- filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
- assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
- }
+ private final String description;
/**
- * Verify that task bindings are picked up.
+ * Create a parameterized instance.
+ * @param fsCommitterName committer to set in filesystem config
+ * @param jobCommitterName committer to set in job config
+ * @param committerClass expected committer class
+ * @param description debug text for thread names.
*/
- public void testBindingsInTask() throws Throwable {
- // set this to an invalid value to be confident it is not
- // being checked.
- filesystemConfRef.set(FS_S3A_COMMITTER_NAME, "INVALID");
- taskConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
- assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
- for (Object[] binding : bindings) {
- taskConfRef.set(FS_S3A_COMMITTER_NAME,
- (String) binding[0]);
- assertFactoryCreatesExpectedCommitter((Class) binding[1]);
- }
+ public ITestS3ACommitterFactory(
+ final String fsCommitterName,
+ final String jobCommitterName,
+ final Class<? extends AbstractS3ACommitter> committerClass,
+ final String description) {
+ this.fsCommitterName = fsCommitterName;
+ this.jobCommitterName = jobCommitterName;
+ this.committerClass = committerClass;
+ this.description = description;
+ }
+
+ @Override
+ protected Configuration createConfiguration() {
+ final Configuration conf = super.createConfiguration();
+ // do not cache, because we want the committer one to pick up
+ // the fs with fs-specific configuration
+ conf.setBoolean(FS_S3A_IMPL_DISABLE_CACHE, false);
+ removeBaseAndBucketOverrides(conf, FS_S3A_COMMITTER_NAME);
+ maybeSetCommitterName(conf, fsCommitterName);
+ return conf;
}
/**
- * Verify that FS bindings are picked up.
+ * Set a committer name in a configuration.
+ * @param conf configuration to patch.
+ * @param name name. If "" the option is unset.
*/
- public void testBindingsInFSConfig() throws Throwable {
- taskConfRef.unset(FS_S3A_COMMITTER_NAME);
- filesystemConfRef.set(FS_S3A_COMMITTER_NAME, COMMITTER_NAME_FILE);
- assertFactoryCreatesExpectedCommitter(FileOutputCommitter.class);
- for (Object[] binding : bindings) {
- taskConfRef.set(FS_S3A_COMMITTER_NAME, (String) binding[0]);
- assertFactoryCreatesExpectedCommitter((Class) binding[1]);
+ private static void maybeSetCommitterName(final Configuration conf, final
String name) {
+ if (!name.isEmpty()) {
+ conf.set(FS_S3A_COMMITTER_NAME, name);
+ } else {
+ conf.unset(FS_S3A_COMMITTER_NAME);
}
}
- /**
- * Create an invalid committer via the FS binding.
- */
- public void testInvalidFileBinding() throws Throwable {
- taskConfRef.unset(FS_S3A_COMMITTER_NAME);
- filesystemConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
- LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
- () -> createCommitter());
+ @Override
+ public void setup() throws Exception {
+ // destroy all filesystems from previous runs.
+ FileSystem.closeAllForUGI(UserGroupInformation.getCurrentUser());
+ super.setup();
+ jobId = randomJobId();
+ attempt0 = "attempt_" + jobId + "_m_000000_0";
+ taskAttempt0 = TaskAttemptID.forName(attempt0);
+
+ outDir = methodPath();
+ factory = new S3ACommitterFactory();
+ final Configuration fsConf = getConfiguration();
+ JobConf jobConf = new JobConf(fsConf);
+ jobConf.set(FileOutputFormat.OUTDIR, outDir.toUri().toString());
+ jobConf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt0);
+ jobConf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1);
+ maybeSetCommitterName(jobConf, jobCommitterName);
+ tContext = new TaskAttemptContextImpl(jobConf, taskAttempt0);
+
+ LOG.info("Filesystem Committer='{}'; task='{}'",
+ fsConf.get(FS_S3A_COMMITTER_NAME),
+ jobConf.get(FS_S3A_COMMITTER_NAME));
+ }
+
+
+ @Override
+ protected void deleteTestDirInTeardown() {
+ // no-op
}
/**
- * Create an invalid committer via the task attempt.
+ * Verify that if all config options are unset, the FileOutputCommitter
+ * is returned.
*/
- public void testInvalidTaskBinding() throws Throwable {
- filesystemConfRef.unset(FS_S3A_COMMITTER_NAME);
- taskConfRef.set(FS_S3A_COMMITTER_NAME, INVALID_NAME);
- LambdaTestUtils.intercept(PathCommitException.class, INVALID_NAME,
- () -> createCommitter());
+ @Test
+ public void testBinding() throws Throwable {
+ assertFactoryCreatesExpectedCommitter(committerClass);
}
/**
* Assert that the factory creates the expected committer.
+ * If a null committer is passed in, a {@link PathIOException}
+ * is expected.
* @param expected expected committer class.
- * @throws IOException IO failure.
+ * @throws Exception IO failure.
*/
- protected void assertFactoryCreatesExpectedCommitter(
+ private void assertFactoryCreatesExpectedCommitter(
final Class expected)
- throws IOException {
- assertEquals("Wrong Committer from factory",
- expected,
- createCommitter().getClass());
+ throws Exception {
+ describe("Creating committer: expected class \"%s\"", expected);
+ if (expected != null) {
+ assertEquals("Wrong Committer from factory",
+ expected,
+ createCommitter().getClass());
+ } else {
+ intercept(PathCommitException.class, () ->
+ createCommitter());
Review Comment:
nit: reference `this::createCommitter` instead of lambda?
> ITestS3ACommitterFactory failing
> --------------------------------
>
> Key: HADOOP-19189
> URL: https://issues.apache.org/jira/browse/HADOOP-19189
> Project: Hadoop Common
> Issue Type: Sub-task
> Components: fs/s3, test
> Affects Versions: 3.4.0
> Reporter: Steve Loughran
> Priority: Minor
> Labels: pull-request-available
>
> we've had ITestS3ACommitterFactory failing for a while, where it looks like
> changed committer settings aren't being picked up.
> {code}
> ERROR]
> ITestS3ACommitterFactory.testEverything:115->testInvalidFileBinding:165
> Expected a org.apache.hadoop.fs.s3a.commit.PathCommitException to be thrown,
> but got the result: :
> FileOutputCommitter{PathOutputCommitter{context=TaskAttemptContextImpl{JobContextImpl
> {code}
> I've spent some time looking at it and it is happening because the test sets
> the fileystem ref for the local test fs, and not that of the filesystem
> created by the committer, which is where the option is picked up.
> i've tried to parameterize it but things are still playing up and I'm not
> sure how hard to try to fix.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]