Updated Branches: refs/heads/master ebacb54c6 -> 146f5080b
CRUNCH-242: Control the input/output conversion via the Source and Target interfaces Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/146f5080 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/146f5080 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/146f5080 Branch: refs/heads/master Commit: 146f5080b5ff90657d3d4853d69179b4fd18ce0a Parents: ebacb54 Author: Josh Wills <[email protected]> Authored: Mon Jul 22 17:12:11 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Tue Jul 23 14:39:59 2013 -0700 ---------------------------------------------------------------------- .../apache/crunch/contrib/io/jdbc/DataBaseSource.java | 5 +++++ crunch-core/src/main/java/org/apache/crunch/Source.java | 7 +++++++ crunch-core/src/main/java/org/apache/crunch/Target.java | 10 ++++++++++ .../main/java/org/apache/crunch/impl/mr/plan/DoNode.java | 5 ++--- .../org/apache/crunch/impl/mr/plan/JobPrototype.java | 9 +++++---- .../java/org/apache/crunch/io/impl/FileSourceImpl.java | 6 ++++++ .../java/org/apache/crunch/io/impl/FileTargetImpl.java | 7 +++++++ .../java/org/apache/crunch/io/impl/SourceTargetImpl.java | 11 +++++++++++ .../apache/crunch/impl/mr/plan/JobNameBuilderTest.java | 2 +- .../org/apache/crunch/io/hbase/HBaseSourceTarget.java | 6 ++++++ .../java/org/apache/crunch/io/hbase/HBaseTarget.java | 6 ++++++ 11 files changed, 66 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java index 6ba5e06..83f509f 100644 --- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.sql.Driver; import org.apache.crunch.Source; +import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; @@ -123,4 +124,8 @@ public class DataBaseSource<T extends DBWritable & Writable> implements Source<T return ptype; } + @Override + public Converter<?, ?, ?, ?> getConverter() { + return ptype.getConverter(); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/Source.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java index b744c8f..b0a0449 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Source.java +++ b/crunch-core/src/main/java/org/apache/crunch/Source.java @@ -19,6 +19,7 @@ package org.apache.crunch; import java.io.IOException; +import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; @@ -35,6 +36,12 @@ public interface Source<T> { PType<T> getType(); /** + * Returns the {@code Converter} used for mapping the inputs from this instance + * into {@code PCollection} or {@code PTable} values. + */ + Converter<?, ?, ?, ?> getConverter(); + + /** * Configure the given job to use this source as an input. * * @param job http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/Target.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java index 48dc2cd..65ad67d 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Target.java +++ b/crunch-core/src/main/java/org/apache/crunch/Target.java @@ -18,6 +18,7 @@ package org.apache.crunch; import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; @@ -80,6 +81,15 @@ public interface Target { boolean accept(OutputHandler handler, PType<?> ptype); /** + * Returns the {@code Converter} to use for mapping from the output {@code PCollection} + * into the output values expected by this instance. + * + * @param ptype The {@code PType} of the data that is being written to this instance + * @return A valid {@code Converter} for the output represented by this instance + */ + Converter<?, ?, ?, ?> getConverter(PType<?> ptype); + + /** * Attempt to create the {@code SourceTarget} type that corresponds to this {@code Target} * for the given {@code PType}, if possible. If it is not possible, return {@code null}. * http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java index 2d6d590..87d0a5b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/DoNode.java @@ -63,8 +63,7 @@ public class DoNode { return new DoNode(fn, name, ptype, NO_CHILDREN, ptype.getGroupingConverter(), null); } - public static <S> DoNode createOutputNode(String name, PType<S> ptype) { - Converter outputConverter = ptype.getConverter(); + public static DoNode createOutputNode(String name, Converter outputConverter, PType<?> ptype) { DoFn<?, ?> fn = ptype.getOutputMapFn(); return new DoNode(fn, name, ptype, NO_CHILDREN, outputConverter, null); } @@ -135,7 +134,7 @@ public class DoNode { Converter inputConverter = null; if (inputNode) { if (nodeContext == NodeContext.MAP) { - inputConverter = ptype.getConverter(); + inputConverter = source.getConverter(); } else { inputConverter = ((PGroupedTableType<?, ?>) ptype).getGroupingConverter(); } http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java index da13611..c733323 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mr/plan/JobPrototype.java @@ -36,6 +36,7 @@ import org.apache.crunch.impl.mr.run.CrunchMapper; import org.apache.crunch.impl.mr.run.CrunchReducer; import org.apache.crunch.impl.mr.run.NodeContext; import org.apache.crunch.impl.mr.run.RTNode; +import org.apache.crunch.types.PType; import org.apache.crunch.util.DistCache; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -149,8 +150,8 @@ class JobPrototype { DoNode node = null; for (NodePath nodePath : targetsToNodePaths.get(target)) { if (node == null) { - PCollectionImpl<?> collect = nodePath.tail(); - node = DoNode.createOutputNode(target.toString(), collect.getPType()); + PType<?> ptype = nodePath.tail().getPType(); + node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype), ptype); outputHandler.configureNode(node, target); } outputNodes.add(walkPath(nodePath.descendingIterator(), node)); @@ -163,8 +164,8 @@ class JobPrototype { DoNode node = null; for (NodePath nodePath : mapSideNodePaths.get(target)) { if (node == null) { - PCollectionImpl<?> collect = nodePath.tail(); - node = DoNode.createOutputNode(target.toString(), collect.getPType()); + PType<?> ptype = nodePath.tail().getPType(); + node = DoNode.createOutputNode(target.toString(), target.getConverter(ptype), ptype); outputHandler.configureNode(node, target); } mapSideNodes.add(walkPath(nodePath.descendingIterator(), node)); http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java index b232abb..13645ba 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java @@ -31,6 +31,7 @@ import org.apache.crunch.io.CrunchInputs; import org.apache.crunch.io.FileReaderFactory; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.SourceTargetHelper; +import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -84,6 +85,11 @@ public class FileSourceImpl<T> implements Source<T> { } @Override + public Converter<?, ?, ?, ?> getConverter() { + return ptype.getConverter(); + } + + @Override public void configureSource(Job job, int inputId) throws IOException { if (inputId == -1) { for (Path path : paths) { http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index 07c63df..cbd87e3 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -86,6 +86,12 @@ public class FileTargetImpl implements PathTarget { return true; } + @Override + public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) { + return ptype.getConverter(); + } + + @Override public void handleOutputs(Configuration conf, Path workingPath, int index) throws IOException { FileSystem srcFs = workingPath.getFileSystem(conf); Path src = getSourcePattern(workingPath, index); @@ -254,4 +260,5 @@ public class FileTargetImpl implements PathTarget { } return exists; } + } http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java index 5dd4d69..68c9430 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java @@ -24,6 +24,7 @@ import org.apache.crunch.Source; import org.apache.crunch.SourceTarget; import org.apache.crunch.Target; import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.Job; @@ -94,4 +95,14 @@ class SourceTargetImpl<T> implements SourceTarget<T> { public long getLastModifiedAt(Configuration configuration) { return source.getLastModifiedAt(configuration); } + + @Override + public Converter<?, ?, ?, ?> getConverter() { + return source.getConverter(); + } + + @Override + public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) { + return target.getConverter(ptype); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java index 7963c83..0a30fa4 100644 --- a/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java +++ b/crunch-core/src/test/java/org/apache/crunch/impl/mr/plan/JobNameBuilderTest.java @@ -30,7 +30,7 @@ public class JobNameBuilderTest { public void testBuild() { final String pipelineName = "PipelineName"; final String nodeName = "outputNode"; - DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings()); + DoNode doNode = DoNode.createOutputNode(nodeName, Writables.strings().getConverter(), Writables.strings()); JobNameBuilder jobNameBuilder = new JobNameBuilder(pipelineName); jobNameBuilder.visit(Lists.newArrayList(doNode)); String jobName = jobNameBuilder.build(); http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index 6a5a124..2f5a160 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -30,6 +30,7 @@ import org.apache.crunch.TableSource; import org.apache.crunch.impl.mr.run.CrunchMapper; import org.apache.crunch.io.CrunchInputs; import org.apache.crunch.io.FormatBundle; +import org.apache.crunch.types.Converter; import org.apache.crunch.types.PTableType; import org.apache.crunch.types.PType; import org.apache.crunch.types.writable.Writables; @@ -130,4 +131,9 @@ public class HBaseSourceTarget extends HBaseTarget implements SourceTarget<Pair< LOG.warn("Cannot determine last modified time for source: " + toString()); return -1; } + + @Override + public Converter<?, ?, ?, ?> getConverter() { + return PTYPE.getConverter(); + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/146f5080/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java index 83d62c8..69a260e 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java @@ -28,6 +28,7 @@ import org.apache.crunch.io.CrunchOutputs; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.MapReduceTarget; import org.apache.crunch.io.OutputHandler; +import org.apache.crunch.types.Converter; import org.apache.crunch.types.PType; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; @@ -120,4 +121,9 @@ public class HBaseTarget implements MapReduceTarget { LOG.info("HBaseTarget ignores checks for existing outputs..."); return false; } + + @Override + public Converter<?, ?, ?, ?> getConverter(final PType<?> ptype) { + return ptype.getConverter(); + } }
