[ 
https://issues.apache.org/jira/browse/CRUNCH-657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16221943#comment-16221943
 ] 

Jihed JOOBEUR commented on CRUNCH-657:
--------------------------------------

Hi [~jwills]

When i write: .write(Compress.snappy(new AvroPathPerKeyTarget(..)), snappy 
compression is not activated. The problem is that AvroPathPerKeyTarget extends 
FileTargetImpl but in its implementation outputConf is not transmitted.
As you can see above, my solution was to create a class that extends 
AvroPathPerKeyTarget and to override outputConf and configureForMapReduce.
i added:
{code:java}
        for (Map.Entry<String, String> e : extraConf.entrySet()) {
            bundle.set(e.getKey(), e.getValue());
        }
{code}
so the configuration is taken into account now.        

> Can't activate Snappy compression with AvroPathPerKeyTarget
> -----------------------------------------------------------
>
>                 Key: CRUNCH-657
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-657
>             Project: Crunch
>          Issue Type: Bug
>          Components: IO
>    Affects Versions: 0.11.0
>         Environment: Cloudera
>            Reporter: Jihed JOOBEUR
>
> Compress.snappy(AvroPathPerKeyTarget) doesn't work.
> I needed to create my own class that extends AvroPathPerKeyTarget.
> {code:java}
> public class CompressedAvroPathPerKeyTarget extends AvroPathPerKeyTarget {
>     private Map<String, String> extraConf = Maps.newHashMap();
>     public CompressedAvroPathPerKeyTarget(Path path) {
>         super(path);
>     }
>     @Override
>     public Target outputConf(String key, String value) {
>         extraConf.put(key, value);
>         return this;
>     }
>     @Override public void configureForMapReduce(Job job, PType<?> ptype, Path 
> outputPath, String name) {
>         AvroType<?> atype = (AvroType) ((PTableType) ptype).getValueType();
>         FormatBundle bundle = 
> FormatBundle.forOutput(AvroPathPerKeyOutputFormat.class);
>         String schemaParam;
>         if (name == null) {
>             schemaParam = "avro.output.schema";
>         } else {
>             schemaParam = "avro.output.schema." + name;
>         }
>         for (Map.Entry<String, String> e : extraConf.entrySet()) {
>             bundle.set(e.getKey(), e.getValue());
>         }
>         bundle.set(schemaParam, atype.getSchema().toString());
>         AvroMode.fromType(atype).configure(bundle);
>         configureForMapReduce(job, AvroWrapper.class, NullWritable.class, 
> bundle, outputPath, name);
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to