[ 
https://issues.apache.org/jira/browse/FLINK-6185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15941797#comment-15941797
 ] 

Luke Hutchison edited comment on FLINK-6185 at 3/25/17 8:22 PM:
----------------------------------------------------------------

Here's my simple gzip OutputFormat though, in case anyone else is looking for a 
quick solution:

{code}
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.zip.GZIPOutputStream;

import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem.WriteMode;

public class GZippableTextOutputFormat<T> implements OutputFormat<T> {
    private static final long serialVersionUID = 1L;

    private File file;
    private PrintWriter writer;
    private boolean gzip;

    public GZippableTextOutputFormat(File file, WriteMode writeMode, boolean 
gzip) {
        if (writeMode != WriteMode.OVERWRITE) {
            // Make this explicit, since we're about to overwrite the file
            throw new IllegalArgumentException("writeMode must be 
WriteMode.OVERWRITE");
        }
        this.file = file.getPath().endsWith(".gz") == gzip ? file : new 
File(file.getPath() + ".gz");
        this.gzip = gzip;
    }

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        if (taskNumber < 0) {
            throw new IllegalArgumentException("Invalid task number");
        }
        if (numTasks == 0 || numTasks > 1) {
            throw new IllegalArgumentException(
                    "must call setParallelism(1) to use " + 
ZippedJSONCollectionOutputFormat.class.getName());
        }
        try {
            writer = gzip ? new PrintWriter(new GZIPOutputStream(new 
FileOutputStream(file)))
                    : new PrintWriter(new FileOutputStream(file));
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public File getFile() {
        return file;
    }

    @Override
    public void configure(Configuration parameters) {
    }

    @Override
    public void writeRecord(T record) throws IOException {
        writer.println(record.toString());
    }

    @Override
    public void close() throws IOException {
        if (writer != null) {
            writer.close();
        }
    }

    @Override
    public String toString() {
        return this.getClass().getSimpleName() + "(" + file + ")";
    }
}
{code}


was (Author: lukehutch):
Here's my simple gzip OutputFormat though, in case anyone else is looking for a 
quick solution:

{code}
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.zip.GZIPOutputStream;

import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem.WriteMode;

public class GZippableTextOutputFormat<T> implements OutputFormat<T> {
    private static final long serialVersionUID = 1L;

    private File file;
    private PrintWriter writer;
    private boolean gzip;

    public GZippableTextOutputFormat(File file, WriteMode writeMode, boolean 
gzip) {
        if (writeMode != WriteMode.OVERWRITE) {
            // Make this explicit, since we're about to overwrite the file
            throw new IllegalArgumentException("writeMode must be 
WriteMode.OVERWRITE");
        }
        this.file = file.getPath().endsWith(".gz") == gzip ? file : new 
File(file.getPath() + ".gz");
        this.gzip = gzip;
    }

    @Override
    public void open(int taskNumber, int numTasks) throws IOException {
        if (taskNumber < 0) {
            throw new IllegalArgumentException("Invalid task number");
        }
        if (numTasks == 0 || numTasks > 1) {
            throw new IllegalArgumentException(
                    "must call setParallelism(1) to use " + 
ZippedJSONCollectionOutputFormat.class.getName());
        }
        try {
            writer = gzip ? new PrintWriter(new GZIPOutputStream(new 
FileOutputStream(file)))
                    : new PrintWriter(new FileOutputStream(file));
        } catch (Exception e) {
            close();
            throw new RuntimeException(e);
        }
    }

    public File getFile() {
        return file;
    }

    @Override
    public void configure(Configuration parameters) {
    }

    @Override
    public void writeRecord(T record) throws IOException {
        writer.println(record.toString());
    }

    @Override
    public void close() throws IOException {
        writer.close();
    }

    @Override
    public String toString() {
        return this.getClass().getSimpleName() + "(" + file + ")";
    }
}
{code}

> Output writers and OutputFormats need to support compression
> ------------------------------------------------------------
>
>                 Key: FLINK-6185
>                 URL: https://issues.apache.org/jira/browse/FLINK-6185
>             Project: Flink
>          Issue Type: Bug
>          Components: Core
>    Affects Versions: 1.2.0
>            Reporter: Luke Hutchison
>            Priority: Minor
>
> File sources (such as {{ExecutionEnvironment#readCsvFile()}}) and sinks (such 
> as {{FileOutputFormat}} and its subclasses, and methods such as 
> {{DataSet#writeAsText()}}) need the ability to transparently decompress and 
> compress files. Primarily gzip would be useful, but it would be nice if this 
> were pluggable to support bzip2, xz, etc.
> There could be options for autodetect (based on file extension and/or file 
> content), which could be the default, as well as no compression or a selected 
> compression method.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to