CRUNCH-275: Support extra config args on Source, Target, and SourceTarget
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/d2a979ca Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/d2a979ca Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/d2a979ca Branch: refs/heads/master Commit: d2a979ca6a3cb95e2394f2ba901ca1874ffc49fa Parents: 9b5e108 Author: Josh Wills <[email protected]> Authored: Sun Oct 6 15:29:44 2013 -0700 Committer: Josh Wills <[email protected]> Committed: Sun Oct 6 23:30:28 2013 -0700 ---------------------------------------------------------------------- .../crunch/contrib/io/jdbc/DataBaseSource.java | 98 +++++++++----------- .../src/main/java/org/apache/crunch/Source.java | 8 ++ .../java/org/apache/crunch/SourceTarget.java | 6 ++ .../src/main/java/org/apache/crunch/Target.java | 7 ++ .../java/org/apache/crunch/io/FormatBundle.java | 4 +- .../apache/crunch/io/avro/AvroFileTarget.java | 14 +-- .../crunch/io/avro/trevni/TrevniKeyTarget.java | 2 +- .../apache/crunch/io/impl/FileSourceImpl.java | 8 +- .../apache/crunch/io/impl/FileTargetImpl.java | 45 +++++++-- .../apache/crunch/io/impl/SourceTargetImpl.java | 19 ++++ .../apache/crunch/io/text/TextFileTarget.java | 3 +- .../crunch/io/hbase/HBaseSourceTarget.java | 15 +++ .../org/apache/crunch/io/hbase/HBaseTarget.java | 16 ++++ .../org/apache/crunch/io/hbase/HFileTarget.java | 31 +------ 14 files changed, 172 insertions(+), 104 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java ---------------------------------------------------------------------- diff --git a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java index 337ecb7..2c51b84 100644 --- a/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java +++ b/crunch-contrib/src/main/java/org/apache/crunch/contrib/io/jdbc/DataBaseSource.java @@ -17,19 +17,14 @@ */ package org.apache.crunch.contrib.io.jdbc; -import java.io.IOException; import java.sql.Driver; -import org.apache.crunch.Source; -import org.apache.crunch.io.CrunchInputs; import org.apache.crunch.io.FormatBundle; -import org.apache.crunch.types.Converter; -import org.apache.crunch.types.PType; +import org.apache.crunch.io.impl.FileSourceImpl; import org.apache.crunch.types.writable.Writables; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.lib.db.DBWritable; @@ -44,78 +39,83 @@ import org.apache.hadoop.mapreduce.lib.db.DBWritable; * * @param <T> The input type of this source */ -public class DataBaseSource<T extends DBWritable & Writable> implements Source<T> { - - private Class<T> inputClass; - private PType<T> ptype; - private String driverClass; - private String url; - private String username; - private String password; - private String selectClause; - public String countClause; - - private DataBaseSource(Class<T> inputClass) { - this.inputClass = inputClass; - this.ptype = Writables.writables(inputClass); +public class DataBaseSource<T extends DBWritable & Writable> extends FileSourceImpl<T> { + + private DataBaseSource(Class<T> inputClass, + String driverClassName, + String url, + String username, + String password, + String selectClause, + String countClause) { + super( + new Path("dbsource"), + Writables.writables(inputClass), + FormatBundle.forInput(DBInputFormat.class) + .set(DBConfiguration.DRIVER_CLASS_PROPERTY, driverClassName) + .set(DBConfiguration.URL_PROPERTY, url) + .set(DBConfiguration.USERNAME_PROPERTY, username) + .set(DBConfiguration.PASSWORD_PROPERTY, password) + .set(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass.getCanonicalName()) + .set(DBConfiguration.INPUT_QUERY, selectClause) + .set(DBConfiguration.INPUT_COUNT_QUERY, countClause)); } static class Builder<T extends DBWritable & Writable> { + private Class<T> inputClass; + private String driverClass; + private String url; + private String username; + private String password; + private String selectClause; + public String countClause; + private DataBaseSource<T> dataBaseSource; public Builder(Class<T> inputClass) { - this.dataBaseSource = new DataBaseSource<T>(inputClass); + this.inputClass = inputClass; } Builder<T> setDriverClass(Class<? extends Driver> driverClass) { - dataBaseSource.driverClass = driverClass.getName(); + this.driverClass = driverClass.getName(); return this; } Builder<T> setUrl(String url) { - dataBaseSource.url = url; + this.url = url; return this; } Builder<T> setUsername(String username) { - dataBaseSource.username = username; + this.username = username; return this; } Builder<T> setPassword(String password) { - dataBaseSource.password = password; + this.password = password; return this; } Builder<T> selectSQLQuery(String selectClause) { - dataBaseSource.selectClause = selectClause; + this.selectClause = selectClause; return this; } Builder<T> countSQLQuery(String countClause) { - dataBaseSource.countClause = countClause; + this.countClause = countClause; return this; } DataBaseSource<T> build() { - return dataBaseSource; - } - } - - @Override - public void configureSource(Job job, int inputId) throws IOException { - Configuration configuration = job.getConfiguration(); - DBConfiguration.configureDB(configuration, driverClass, url, username, password); - if (inputId == -1) { - job.setInputFormatClass(DBInputFormat.class); - DBInputFormat.setInput(job, inputClass, selectClause, countClause); - } else { - FormatBundle<DBInputFormat> bundle = FormatBundle.forInput(DBInputFormat.class) - .set(DBConfiguration.INPUT_CLASS_PROPERTY, inputClass.getCanonicalName()) - .set(DBConfiguration.INPUT_QUERY, selectClause) - .set(DBConfiguration.INPUT_COUNT_QUERY, countClause); - CrunchInputs.addInputPath(job, new Path("dbsource"), bundle, inputId); + return new DataBaseSource<T>( + inputClass, + driverClass, + url, + username, + password, + selectClause, + countClause); } } @@ -129,14 +129,4 @@ public class DataBaseSource<T extends DBWritable & Writable> implements Source<T public long getLastModifiedAt(Configuration configuration) { return -1; } - - @Override - public PType<T> getType() { - return ptype; - } - - @Override - public Converter<?, ?, ?, ?> getConverter() { - return ptype.getConverter(); - } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/Source.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Source.java b/crunch-core/src/main/java/org/apache/crunch/Source.java index b0a0449..b209dfc 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Source.java +++ b/crunch-core/src/main/java/org/apache/crunch/Source.java @@ -30,6 +30,14 @@ import org.apache.hadoop.mapreduce.Job; * */ public interface Source<T> { + + /** + * Adds the given key-value pair to the {@code Configuration} instance that is used to read + * this {@code Source<T></T>}. Allows for multiple inputs to re-use the same config keys with + * different values when necessary. + */ + Source<T> inputConf(String key, String value); + /** * Returns the {@code PType} for this source. */ http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java index 09c03c6..80cd730 100644 --- a/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/SourceTarget.java @@ -23,4 +23,10 @@ package org.apache.crunch; * */ public interface SourceTarget<T> extends Source<T>, Target { + /** + * Adds the given key-value pair to the {@code Configuration} instance(s) that are used to + * read and write this {@code SourceTarget<T>}. Allows for multiple inputs and outputs to + * re-use the same config keys with different values when necessary. + */ + SourceTarget<T> conf(String key, String value); } http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/Target.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/Target.java b/crunch-core/src/main/java/org/apache/crunch/Target.java index 65ad67d..112c637 100644 --- a/crunch-core/src/main/java/org/apache/crunch/Target.java +++ b/crunch-core/src/main/java/org/apache/crunch/Target.java @@ -61,6 +61,13 @@ public interface Target { } /** + * Adds the given key-value pair to the {@code Configuration} instance that is used to write + * this {@code Target}. Allows for multiple target outputs to re-use the same config keys with + * different values when necessary. + */ + Target outputConf(String key, String value); + + /** * Apply the given {@code WriteMode} to this {@code Target} instance. * * @param writeMode The strategy for handling existing outputs http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java index 4796006..aa84fee 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/FormatBundle.java @@ -70,8 +70,8 @@ public class FormatBundle<K> implements Serializable, Writable, Configurable { return new FormatBundle<T>(inputFormatClass); } - public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T> inputFormatClass) { - return new FormatBundle<T>(inputFormatClass); + public static <T extends OutputFormat<?, ?>> FormatBundle<T> forOutput(Class<T> outputFormatClass) { + return new FormatBundle<T>(outputFormatClass); } public FormatBundle() { http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java index ea0179f..fc82361 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/AvroFileTarget.java @@ -20,6 +20,7 @@ package org.apache.crunch.io.avro; import org.apache.avro.mapred.AvroWrapper; import org.apache.crunch.SourceTarget; import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.OutputHandler; import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.FileTargetImpl; @@ -63,21 +64,16 @@ public class AvroFileTarget extends FileTargetImpl { @Override public void configureForMapReduce(Job job, PType<?> ptype, Path outputPath, String name) { AvroType<?> atype = (AvroType<?>) ptype; - Configuration conf = job.getConfiguration(); + FormatBundle bundle = FormatBundle.forOutput(AvroOutputFormat.class); String schemaParam = null; if (name == null) { schemaParam = "avro.output.schema"; } else { schemaParam = "avro.output.schema." + name; } - String outputSchema = conf.get(schemaParam); - if (outputSchema == null) { - conf.set(schemaParam, atype.getSchema().toString()); - } else if (!outputSchema.equals(atype.getSchema().toString())) { - throw new IllegalStateException("Avro targets must use the same output schema"); - } - Avros.configureReflectDataFactory(conf); - configureForMapReduce(job, AvroWrapper.class, NullWritable.class, AvroOutputFormat.class, + bundle.set(schemaParam, atype.getSchema().toString()); + Avros.configureReflectDataFactory(job.getConfiguration()); + configureForMapReduce(job, AvroWrapper.class, NullWritable.class, bundle, outputPath, name); } http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java index e1f2ab1..e7acc08 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/avro/trevni/TrevniKeyTarget.java @@ -83,7 +83,7 @@ public class TrevniKeyTarget extends FileTargetImpl { AvroJob.setMapOutputKeySchema(job, atype.getSchema()); Avros.configureReflectDataFactory(conf); - configureForMapReduce(job, AvroKey.class, NullWritable.class, TrevniOutputFormat.class, + configureForMapReduce(job, AvroKey.class, NullWritable.class, FormatBundle.forOutput(TrevniOutputFormat.class), outputPath, name); } else { FormatBundle<TrevniOutputFormat> bundle = FormatBundle.forOutput( http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java index a3cbdc8..766b9b0 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileSourceImpl.java @@ -84,7 +84,13 @@ public class FileSourceImpl<T> implements Source<T> { public List<Path> getPaths() { return paths; } - + + @Override + public Source<T> inputConf(String key, String value) { + inputBundle.set(key, value); + return this; + } + @Override public Converter<?, ?, ?, ?> getConverter() { return ptype.getConverter(); http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java index cbd87e3..8ae2589 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/FileTargetImpl.java @@ -18,17 +18,21 @@ package org.apache.crunch.io.impl; import java.io.IOException; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import com.google.common.collect.ImmutableMap; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; import org.apache.crunch.impl.mr.plan.PlanningParameters; import org.apache.crunch.io.CrunchOutputs; import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.OutputHandler; import org.apache.crunch.io.PathTarget; import org.apache.crunch.io.SourceTargetHelper; @@ -46,14 +50,30 @@ public class FileTargetImpl implements PathTarget { private static final Log LOG = LogFactory.getLog(FileTargetImpl.class); protected final Path path; - private final Class<? extends FileOutputFormat> outputFormatClass; + private final FormatBundle<? extends FileOutputFormat> formatBundle; private final FileNamingScheme fileNamingScheme; public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass, - FileNamingScheme fileNamingScheme) { + FileNamingScheme fileNamingScheme) { + this(path, outputFormatClass, fileNamingScheme, ImmutableMap.<String, String>of()); + } + + public FileTargetImpl(Path path, Class<? extends FileOutputFormat> outputFormatClass, + FileNamingScheme fileNamingScheme, Map<String, String> extraConf) { this.path = path; - this.outputFormatClass = outputFormatClass; + this.formatBundle = FormatBundle.forOutput(outputFormatClass); this.fileNamingScheme = fileNamingScheme; + if (extraConf != null && !extraConf.isEmpty()) { + for (Map.Entry<String, String> e : extraConf.entrySet()) { + formatBundle.set(e.getKey(), e.getValue()); + } + } + } + + @Override + public Target outputConf(String key, String value) { + formatBundle.set(key, value); + return this; } @Override @@ -61,22 +81,29 @@ public class FileTargetImpl implements PathTarget { Converter converter = ptype.getConverter(); Class keyClass = converter.getKeyClass(); Class valueClass = converter.getValueClass(); - configureForMapReduce(job, keyClass, valueClass, outputFormatClass, outputPath, name); + configureForMapReduce(job, keyClass, valueClass, formatBundle, outputPath, name); } + @Deprecated protected void configureForMapReduce(Job job, Class keyClass, Class valueClass, Class outputFormatClass, Path outputPath, String name) { + configureForMapReduce(job, keyClass, valueClass, FormatBundle.forOutput(outputFormatClass), outputPath, name); + } + + protected void configureForMapReduce(Job job, Class keyClass, Class valueClass, + FormatBundle formatBundle, Path outputPath, String name) { try { FileOutputFormat.setOutputPath(job, outputPath); } catch (Exception e) { throw new RuntimeException(e); } if (name == null) { - job.setOutputFormatClass(outputFormatClass); + job.setOutputFormatClass(formatBundle.getFormatClass()); + formatBundle.configure(job.getConfiguration()); job.setOutputKeyClass(keyClass); job.setOutputValueClass(valueClass); } else { - CrunchOutputs.addNamedOutput(job, name, outputFormatClass, keyClass, valueClass); + CrunchOutputs.addNamedOutput(job, name, formatBundle, keyClass, valueClass); } } @@ -185,7 +212,11 @@ public class FileTargetImpl implements PathTarget { @Override public String toString() { - return new StringBuilder().append(outputFormatClass.getSimpleName()).append("(").append(path).append(")") + return new StringBuilder() + .append(formatBundle.getFormatClass().getSimpleName()) + .append("(") + .append(path) + .append(")") .toString(); } http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java index 68c9430..b15a00b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/impl/SourceTargetImpl.java @@ -40,6 +40,12 @@ class SourceTargetImpl<T> implements SourceTarget<T> { } @Override + public Source<T> inputConf(String key, String value) { + source.inputConf(key, value); + return this; + } + + @Override public PType<T> getType() { return source.getType(); } @@ -87,6 +93,12 @@ class SourceTargetImpl<T> implements SourceTarget<T> { } @Override + public Target outputConf(String key, String value) { + target.outputConf(key, value); + return this; + } + + @Override public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf) { return target.handleExisting(strategy, lastModifiedAt, conf); } @@ -105,4 +117,11 @@ class SourceTargetImpl<T> implements SourceTarget<T> { public Converter<?, ?, ?, ?> getConverter(PType<?> ptype) { return target.getConverter(ptype); } + + @Override + public SourceTarget<T> conf(String key, String value) { + source.inputConf(key, value); + target.outputConf(key, value); + return this; + } } http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java index 17ae7a6..4b9197b 100644 --- a/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java +++ b/crunch-core/src/main/java/org/apache/crunch/io/text/TextFileTarget.java @@ -20,6 +20,7 @@ package org.apache.crunch.io.text; import org.apache.avro.Schema; import org.apache.crunch.SourceTarget; import org.apache.crunch.io.FileNamingScheme; +import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.SequentialFileNamingScheme; import org.apache.crunch.io.impl.FileTargetImpl; import org.apache.crunch.types.Converter; @@ -72,7 +73,7 @@ public class TextFileTarget extends FileTargetImpl { Converter converter = ptype.getConverter(); Class keyClass = converter.getKeyClass(); Class valueClass = converter.getValueClass(); - configureForMapReduce(job, keyClass, valueClass, getOutputFormat(ptype), outputPath, name); + configureForMapReduce(job, keyClass, valueClass, FormatBundle.forOutput(getOutputFormat(ptype)), outputPath, name); } @Override http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java index c003e48..1b2a03e 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseSourceTarget.java @@ -26,6 +26,8 @@ import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.Pair; +import org.apache.crunch.Source; +import org.apache.crunch.SourceTarget; import org.apache.crunch.TableSource; import org.apache.crunch.impl.mr.run.CrunchMapper; import org.apache.crunch.io.CrunchInputs; @@ -73,6 +75,12 @@ public class HBaseSourceTarget extends HBaseTarget implements } @Override + public Source<Pair<ImmutableBytesWritable, Result>> inputConf(String key, String value) { + inputBundle.set(key, value); + return this; + } + + @Override public PType<Pair<ImmutableBytesWritable, Result>> getType() { return PTYPE; } @@ -146,6 +154,13 @@ public class HBaseSourceTarget extends HBaseTarget implements return new HTableIterable(htable, scan); } + @Override + public SourceTarget<Pair<ImmutableBytesWritable, Result>> conf(String key, String value) { + inputConf(key, value); + outputConf(key, value); + return this; + } + private static class HTableIterable implements Iterable<Pair<ImmutableBytesWritable, Result>> { private final HTable table; private final Scan scan; http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java index 69a260e..2c3c239 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HBaseTarget.java @@ -18,12 +18,15 @@ package org.apache.crunch.io.hbase; import java.io.IOException; +import java.util.Map; +import com.google.common.collect.Maps; import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.crunch.CrunchRuntimeException; import org.apache.crunch.SourceTarget; +import org.apache.crunch.Target; import org.apache.crunch.io.CrunchOutputs; import org.apache.crunch.io.FormatBundle; import org.apache.crunch.io.MapReduceTarget; @@ -45,6 +48,7 @@ public class HBaseTarget implements MapReduceTarget { private static final Log LOG = LogFactory.getLog(HBaseTarget.class); protected String table; + private Map<String, String> extraConf = Maps.newHashMap(); public HBaseTarget(String table) { this.table = table; @@ -100,10 +104,16 @@ public class HBaseTarget implements MapReduceTarget { job.setOutputKeyClass(ImmutableBytesWritable.class); job.setOutputValueClass(typeClass); conf.set(TableOutputFormat.OUTPUT_TABLE, table); + for (Map.Entry<String, String> e : extraConf.entrySet()) { + conf.set(e.getKey(), e.getValue()); + } } else { FormatBundle<TableOutputFormat> bundle = FormatBundle.forOutput( TableOutputFormat.class); bundle.set(TableOutputFormat.OUTPUT_TABLE, table); + for (Map.Entry<String, String> e : extraConf.entrySet()) { + bundle.set(e.getKey(), e.getValue()); + } CrunchOutputs.addNamedOutput(job, name, bundle, ImmutableBytesWritable.class, @@ -117,6 +127,12 @@ public class HBaseTarget implements MapReduceTarget { } @Override + public Target outputConf(String key, String value) { + extraConf.put(key, value); + return this; + } + + @Override public boolean handleExisting(WriteMode strategy, long lastModifiedAt, Configuration conf) { LOG.info("HBaseTarget ignores checks for existing outputs..."); return false; http://git-wip-us.apache.org/repos/asf/crunch/blob/d2a979ca/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java ---------------------------------------------------------------------- diff --git a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java index 1cef4fa..0a78bd8 100644 --- a/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java +++ b/crunch-hbase/src/main/java/org/apache/crunch/io/hbase/HFileTarget.java @@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class HFileTarget extends FileTargetImpl { private static final HColumnDescriptor DEFAULT_COLUMN_DESCRIPTOR = new HColumnDescriptor(); - private final HColumnDescriptor hcol; public HFileTarget(String path) { this(new Path(path)); @@ -45,34 +44,8 @@ public class HFileTarget extends FileTargetImpl { public HFileTarget(Path path, HColumnDescriptor hcol) { super(path, HFileOutputFormatForCrunch.class, SequentialFileNamingScheme.getInstance()); - this.hcol = Preconditions.checkNotNull(hcol); - } - - @Override - protected void configureForMapReduce( - Job job, - Class keyClass, - Class valueClass, - Class outputFormatClass, - Path outputPath, - String name) { - try { - FileOutputFormat.setOutputPath(job, outputPath); - } catch (Exception e) { - throw new RuntimeException(e); - } - - String hcolStr = Hex.encodeHexString(WritableUtils.toByteArray(hcol)); - if (name == null) { - job.setOutputFormatClass(HFileOutputFormatForCrunch.class); - job.setOutputKeyClass(keyClass); - job.setOutputValueClass(valueClass); - job.getConfiguration().set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr); - } else { - FormatBundle<HFileOutputFormatForCrunch> bundle = FormatBundle.forOutput(HFileOutputFormatForCrunch.class); - bundle.set(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, hcolStr); - CrunchOutputs.addNamedOutput(job, name, bundle, keyClass, valueClass); - } + Preconditions.checkNotNull(hcol); + outputConf(HFileOutputFormatForCrunch.HCOLUMN_DESCRIPTOR_KEY, Hex.encodeHexString(WritableUtils.toByteArray(hcol))); } @Override
