Repository: crunch Updated Branches: refs/heads/master 65f39198e -> 02828f8f0
CRUNCH-600: pass job credentials when building multiple outputs Signed-off-by: Micah Whitacre <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/02828f8f Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/02828f8f Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/02828f8f Branch: refs/heads/master Commit: 02828f8f08d7e6b427f3f03a9118bdd7ac6c3342 Parents: 65f3919 Author: Igor Bernstein <[email protected]> Authored: Sun Apr 10 15:42:10 2016 -0400 Committer: Micah Whitacre <[email protected]> Committed: Mon Apr 11 09:47:54 2016 -0500 ---------------------------------------------------------------------- .../it/java/org/apache/crunch/MRPipelineIT.java | 52 ++++++++++++++++++++ .../org/apache/crunch/io/CrunchOutputs.java | 3 +- 2 files changed, 54 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/02828f8f/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java index 6af3f84..8cda55b 100644 --- a/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java +++ b/crunch-core/src/it/java/org/apache/crunch/MRPipelineIT.java @@ -25,6 +25,7 @@ import java.io.FileFilter; import java.io.IOException; import java.io.Serializable; import java.net.URLEncoder; +import java.util.Map; import com.google.common.io.Files; import org.apache.commons.io.filefilter.SuffixFileFilter; @@ -33,10 +34,20 @@ import org.apache.crunch.fn.FilterFns; import org.apache.crunch.fn.IdentityFn; import org.apache.crunch.impl.mr.MRPipeline; import org.apache.crunch.impl.mr.plan.PlanningParameters; +import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.To; +import org.apache.crunch.io.text.TextFileTarget; import org.apache.crunch.test.TemporaryPath; import org.apache.crunch.test.TemporaryPaths; +import org.apache.crunch.types.Converter; +import org.apache.crunch.types.PType; import org.apache.crunch.types.writable.Writables; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.junit.Rule; import org.junit.Test; @@ -109,4 +120,45 @@ public class MRPipelineIT implements Serializable { String regex = ".*_\\d{4}-\\d{2}-\\d{2}_\\d{2}\\.\\d{2}\\.\\d{2}\\.\\d{3}_jobplan\\.dot"; assertTrue("DOT file name '" + fileName + "' did not match regex '" + regex + "'.", fileName.matches(regex)); } + + @Test + public void testJobCredentials() throws IOException { + Pipeline pipeline = new MRPipeline(MRPipelineIT.class, tmpDir.getDefaultConfiguration()); + PCollection<String> lines = pipeline.readTextFile(tmpDir.copyResourceFileName("set1.txt")); + + pipeline.write(lines, new SecretTextFileTarget(tmpDir.getFile("output").getAbsolutePath())); + + PipelineResult pipelineResult = pipeline.done(); + assertTrue(pipelineResult.succeeded()); + } + + private static class SecretTextFileTarget extends TextFileTarget { + public SecretTextFileTarget(String path) { + super(path); + } + + @Override + public Target outputConf(String key, String value) { + return super.outputConf(key, value); + } + + @Override + public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { + Converter converter = ptype.getConverter(); + Class keyClass = converter.getKeyClass(); + Class valueClass = converter.getValueClass(); + FormatBundle fb = FormatBundle.forOutput(SecretTextOutputFormat.class); + configureForMapReduce(job, keyClass, valueClass, fb, outputPath, name); + + job.getCredentials().addSecretKey(new Text("secret"), "myPassword".getBytes()); + } + } + private static class SecretTextOutputFormat extends TextOutputFormat { + @Override + public RecordWriter getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException { + byte[] secret = job.getCredentials().getSecretKey(new Text("secret")); + assertEquals("job credentials did not match", "myPassword", new String(secret)); + return super.getRecordWriter(job); + } + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/02828f8f/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java index 653a401..a9621ba 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/CrunchOutputs.java @@ -21,6 +21,7 @@ import com.google.common.collect.Sets; import org.apache.crunch.CrunchRuntimeException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobID; @@ -203,7 +204,7 @@ public class CrunchOutputs<K, V> { private static Job getJob(JobID jobID, String namedOutput, Configuration baseConf) throws IOException { - Job job = new Job(new Configuration(baseConf)); + Job job = new Job(new JobConf(baseConf)); job.getConfiguration().set("crunch.namedoutput", namedOutput); setJobID(job, jobID, namedOutput); return job;
