[ 
https://issues.apache.org/jira/browse/CRUNCH-657?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jihed JOOBEUR updated CRUNCH-657:
---------------------------------
    Description: 
Compress.snappy(AvroPathPerKeyTarget) doesn't work.

I needed to create my own class that extends AvroPathPerKeyTarget.

{code:java}
public class CompressedAvroPathPerKeyTarget extends AvroPathPerKeyTarget {

    private Map<String, String> extraConf = Maps.newHashMap();

    public CompressedAvroPathPerKeyTarget(Path path) {
        super(path);
    }

    @Override
    public Target outputConf(String key, String value) {
        extraConf.put(key, value);
        return this;
    }

    @Override public void configureForMapReduce(Job job, PType<?> ptype, Path 
outputPath, String name) {
        AvroType<?> atype = (AvroType) ((PTableType) ptype).getValueType();
        FormatBundle bundle = 
FormatBundle.forOutput(AvroPathPerKeyOutputFormat.class);
        String schemaParam;
        if (name == null) {
            schemaParam = "avro.output.schema";
        } else {
            schemaParam = "avro.output.schema." + name;
        }

        for (Map.Entry<String, String> e : extraConf.entrySet()) {
            bundle.set(e.getKey(), e.getValue());
        }

        bundle.set(schemaParam, atype.getSchema().toString());
        AvroMode.fromType(atype).configure(bundle);
        configureForMapReduce(job, AvroWrapper.class, NullWritable.class, 
bundle, outputPath, name);
    }
}
{code}


  was:
Compress.snappy(AvroPathPerKeyTarget) does'nt work.

I needed to create my own class that extends AvroPathPerKeyTarget.

{code:java}
public class CompressedAvroPathPerKeyTarget extends AvroPathPerKeyTarget {

    private Map<String, String> extraConf = Maps.newHashMap();

    public CompressedAvroPathPerKeyTarget(Path path) {
        super(path);
    }

    @Override
    public Target outputConf(String key, String value) {
        extraConf.put(key, value);
        return this;
    }

    @Override public void configureForMapReduce(Job job, PType<?> ptype, Path 
outputPath, String name) {
        AvroType<?> atype = (AvroType) ((PTableType) ptype).getValueType();
        FormatBundle bundle = 
FormatBundle.forOutput(AvroPathPerKeyOutputFormat.class);
        String schemaParam;
        if (name == null) {
            schemaParam = "avro.output.schema";
        } else {
            schemaParam = "avro.output.schema." + name;
        }

        for (Map.Entry<String, String> e : extraConf.entrySet()) {
            bundle.set(e.getKey(), e.getValue());
        }

        bundle.set(schemaParam, atype.getSchema().toString());
        AvroMode.fromType(atype).configure(bundle);
        configureForMapReduce(job, AvroWrapper.class, NullWritable.class, 
bundle, outputPath, name);
    }
}
{code}



> Can't activate Snappy compression with AvroPathPerKeyTarget
> -----------------------------------------------------------
>
>                 Key: CRUNCH-657
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-657
>             Project: Crunch
>          Issue Type: Bug
>          Components: IO
>    Affects Versions: 0.11.0
>         Environment: Cloudera
>            Reporter: Jihed JOOBEUR
>
> Compress.snappy(AvroPathPerKeyTarget) doesn't work.
> I needed to create my own class that extends AvroPathPerKeyTarget.
> {code:java}
> public class CompressedAvroPathPerKeyTarget extends AvroPathPerKeyTarget {
>     private Map<String, String> extraConf = Maps.newHashMap();
>     public CompressedAvroPathPerKeyTarget(Path path) {
>         super(path);
>     }
>     @Override
>     public Target outputConf(String key, String value) {
>         extraConf.put(key, value);
>         return this;
>     }
>     @Override public void configureForMapReduce(Job job, PType<?> ptype, Path 
> outputPath, String name) {
>         AvroType<?> atype = (AvroType) ((PTableType) ptype).getValueType();
>         FormatBundle bundle = 
> FormatBundle.forOutput(AvroPathPerKeyOutputFormat.class);
>         String schemaParam;
>         if (name == null) {
>             schemaParam = "avro.output.schema";
>         } else {
>             schemaParam = "avro.output.schema." + name;
>         }
>         for (Map.Entry<String, String> e : extraConf.entrySet()) {
>             bundle.set(e.getKey(), e.getValue());
>         }
>         bundle.set(schemaParam, atype.getSchema().toString());
>         AvroMode.fromType(atype).configure(bundle);
>         configureForMapReduce(job, AvroWrapper.class, NullWritable.class, 
> bundle, outputPath, name);
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to