Repository: flink
Updated Branches:
  refs/heads/master c854d5260 -> 16ab1fd97


[FLINK-1981] add support for GZIP files

* register decompression algorithms with file extensions for extensibility
* fit deflate decompression into this scheme
* add support for GZIP files
* test support for deflate and GZIP files with the CsvInputFormat
* replace Apache Commons' Validate with Guava's Preconditions
* add documentation on reading compressed files

This closes #762.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/16ab1fd9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/16ab1fd9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/16ab1fd9

Branch: refs/heads/master
Commit: 16ab1fd97f5ffc2d77a288b5b561f9cbbbd1f63c
Parents: c854d52
Author: Sebastian Kruse <sebastian.kr...@hpi.de>
Authored: Tue Jun 2 18:58:35 2015 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Mon Jun 8 10:43:45 2015 +0200

----------------------------------------------------------------------
 docs/apis/programming_guide.md                  |  33 ++++++
 .../flink/api/common/io/FileInputFormat.java    |  79 ++++++++++++--
 .../io/InflaterInputStreamFSInputWrapper.java   |  32 ------
 .../DeflateInflaterInputStreamFactory.java      |  49 +++++++++
 .../GzipInflaterInputStreamFactory.java         |  48 +++++++++
 .../compression/InflaterInputStreamFactory.java |  44 ++++++++
 .../common/io/GenericCsvInputFormatTest.java    | 105 +++++++++++++++++++
 7 files changed, 351 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/docs/apis/programming_guide.md
----------------------------------------------------------------------
diff --git a/docs/apis/programming_guide.md b/docs/apis/programming_guide.md
index 517781f..17903a9 100644
--- a/docs/apis/programming_guide.md
+++ b/docs/apis/programming_guide.md
@@ -1880,6 +1880,39 @@ 
env.readTextFile("file:///path/with.nested/files").withParameters(parameters)
 
 </div>
 </div>
+
+### Read Compressed Files
+
+Flink currently supports transparent decompression of input files if these are 
marked with an appropriate file extension. In particular, this means that no 
further configuration of the input formats is necessary and any 
`FileInputFormat` support the compression, including custom input formats. 
Please notice that compressed files might not be read in parallel, thus 
impacting job scalability.
+
+The following table lists the currently supported compression methods.
+
+<br />
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left" style="width: 20%">Compression method</th>
+      <th class="text-left">File extensions</th>
+      <th class="text-left" style="width: 20%">Parallelizable</th>
+    </tr>
+  </thead>
+
+  <tbody>
+    <tr>
+      <td><strong>DEFLATE</strong></td>
+      <td>`.deflate`</td>
+      <td>no</td>
+    </tr>
+    <tr>
+      <td><strong>GZip</strong></td>
+      <td>`.gz`, `.gzip`</td>
+      <td>no</td>
+    </tr>
+  </tbody>
+</table>
+
+
 [Back to top](#top)
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index cdc408d..0584b96 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -21,10 +21,16 @@ package org.apache.flink.api.common.io;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
+import com.google.common.base.Preconditions;
+import 
org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory;
+import 
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
+import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
@@ -68,10 +74,11 @@ public abstract class FileInputFormat<OT> implements 
InputFormat<OT, FileInputSp
        private static long DEFAULT_OPENING_TIMEOUT;
 
        /**
-        * Files with that suffix are unsplittable at a file level
-        * and compressed.
+        * A mapping of file extensions to decompression algorithms based on 
DEFLATE. Such compressions lead to
+        * unsplittable files.
         */
-       protected static final String DEFLATE_SUFFIX = ".deflate";
+       protected static final Map<String, InflaterInputStreamFactory<?>> 
INFLATER_INPUT_STREAM_FACTORIES =
+                       new HashMap<String, InflaterInputStreamFactory<?>>();
        
        /**
         * The splitLength is set to -1L for reading the whole split.
@@ -80,6 +87,7 @@ public abstract class FileInputFormat<OT> implements 
InputFormat<OT, FileInputSp
        
        static {
                initDefaultsFromConfiguration();
+               initDefaultInflaterInputStreamFactories();
        }
        
        private static void initDefaultsFromConfiguration() {
@@ -96,6 +104,52 @@ public abstract class FileInputFormat<OT> implements 
InputFormat<OT, FileInputSp
                        DEFAULT_OPENING_TIMEOUT = to;
                }
        }
+
+       private static void initDefaultInflaterInputStreamFactories() {
+               InflaterInputStreamFactory<?>[] defaultFactories = {
+                               DeflateInflaterInputStreamFactory.getInstance(),
+                               GzipInflaterInputStreamFactory.getInstance()
+               };
+               for (InflaterInputStreamFactory<?> inputStreamFactory : 
defaultFactories) {
+                       for (String fileExtension : 
inputStreamFactory.getCommonFileExtensions()) {
+                               
registerInflaterInputStreamFactory(fileExtension, inputStreamFactory);
+                       }
+               }
+       }
+
+       /**
+        * Registers a decompression algorithm through a {@link 
org.apache.flink.api.common.io.compression.InflaterInputStreamFactory}
+        * with a file extension for transparent decompression.
+        * @param fileExtension of the compressed files
+        * @param factory to create an {@link 
java.util.zip.InflaterInputStream} that handles the decompression format
+        */
+       public static void registerInflaterInputStreamFactory(String 
fileExtension, InflaterInputStreamFactory<?> factory) {
+               synchronized (INFLATER_INPUT_STREAM_FACTORIES) {
+                       if (INFLATER_INPUT_STREAM_FACTORIES.put(fileExtension, 
factory) != null) {
+                               LOG.warn("Overwriting an existing decompression 
algorithm for \"{}\" files.", fileExtension);
+                       }
+               }
+       }
+
+       protected static InflaterInputStreamFactory<?> 
getInflaterInputStreamFactory(String fileExtension) {
+               synchronized (INFLATER_INPUT_STREAM_FACTORIES) {
+                       return 
INFLATER_INPUT_STREAM_FACTORIES.get(fileExtension);
+               }
+       }
+
+       /**
+        * Returns the extension of a file name (!= a path).
+        * @return the extension of the file name or {@code null} if there is 
no extension.
+        */
+       protected static String extractFileExtension(String fileName) {
+               Preconditions.checkNotNull(fileName);
+               int lastPeriodIndex = fileName.lastIndexOf('.');
+               if (lastPeriodIndex < 0){
+                       return null;
+               } else {
+                       return fileName.substring(lastPeriodIndex + 1);
+               }
+       }
        
        static long getDefaultOpeningTimeout() {
                return DEFAULT_OPENING_TIMEOUT;
@@ -533,13 +587,23 @@ public abstract class FileInputFormat<OT> implements 
InputFormat<OT, FileInputSp
        }
 
        protected boolean testForUnsplittable(FileStatus pathFile) {
-               if(pathFile.getPath().getName().endsWith(DEFLATE_SUFFIX)) {
+               if(getInflaterInputStreamFactory(pathFile.getPath()) != null) {
                        unsplittable = true;
                        return true;
                }
                return false;
        }
 
+       private InflaterInputStreamFactory<?> 
getInflaterInputStreamFactory(Path path) {
+               String fileExtension = extractFileExtension(path.getName());
+               if (fileExtension != null) {
+                       return getInflaterInputStreamFactory(fileExtension);
+               } else {
+                       return null;
+               }
+
+       }
+
        /**
         * A simple hook to filter files and directories from the input.
         * The method may be overridden. Hadoop's FileInputFormat has a similar 
mechanism and applies the
@@ -633,9 +697,10 @@ public abstract class FileInputFormat<OT> implements 
InputFormat<OT, FileInputSp
         * @see org.apache.flink.api.common.io.InputStreamFSInputWrapper
         */
        protected FSDataInputStream decorateInputStream(FSDataInputStream 
inputStream, FileInputSplit fileSplit) throws Throwable {
-               // Wrap stream in a extracting (decompressing) stream if file 
ends with .deflate.
-               if (fileSplit.getPath().getName().endsWith(DEFLATE_SUFFIX)) {
-                       return new InflaterInputStreamFSInputWrapper(stream);
+               // Wrap stream in a extracting (decompressing) stream if file 
ends with a known compression file extension.
+               InflaterInputStreamFactory<?> inflaterInputStreamFactory = 
getInflaterInputStreamFactory(fileSplit.getPath());
+               if (inflaterInputStreamFactory != null) {
+                       return new 
InputStreamFSInputWrapper(inflaterInputStreamFactory.create(stream));
                }
 
                return inputStream;

http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java
deleted file mode 100644
index ac87535..0000000
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.api.common.io;
-
-import org.apache.flink.core.fs.FSDataInputStream;
-
-import java.util.zip.InflaterInputStream;
-
-public class InflaterInputStreamFSInputWrapper extends 
InputStreamFSInputWrapper {
-
-       public InflaterInputStreamFSInputWrapper(FSDataInputStream inStream) {
-               super(new InflaterInputStream(inStream));
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
new file mode 100644
index 0000000..20c79db
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/DeflateInflaterInputStreamFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io.compression;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Factory for input streams that decompress the "deflate" compression format.
+ */
+public class DeflateInflaterInputStreamFactory implements 
InflaterInputStreamFactory<InflaterInputStream> {
+
+       private static DeflateInflaterInputStreamFactory INSTANCE = null;
+
+       public static DeflateInflaterInputStreamFactory getInstance() {
+               if (INSTANCE == null) {
+                       INSTANCE = new DeflateInflaterInputStreamFactory();
+               }
+               return INSTANCE;
+       }
+
+       @Override
+       public InflaterInputStream create(InputStream in) throws IOException {
+               return new InflaterInputStream(in);
+       }
+
+       @Override
+       public Collection<String> getCommonFileExtensions() {
+               return Collections.singleton("deflate");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
new file mode 100644
index 0000000..aebbac3
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/GzipInflaterInputStreamFactory.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.api.common.io.compression;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.zip.GZIPInputStream;
+
+/**
+ * Factory for input streams that decompress the GZIP compression format.
+ */
+public class GzipInflaterInputStreamFactory implements 
InflaterInputStreamFactory<GZIPInputStream> {
+
+       private static GzipInflaterInputStreamFactory INSTANCE = null;
+
+       public static GzipInflaterInputStreamFactory getInstance() {
+               if (INSTANCE == null) {
+                       INSTANCE = new GzipInflaterInputStreamFactory();
+               }
+               return INSTANCE;
+       }
+       @Override
+       public GZIPInputStream create(InputStream in) throws IOException {
+               return new GZIPInputStream(in);
+       }
+
+       @Override
+       public Collection<String> getCommonFileExtensions() {
+               return Arrays.asList("gz", "gzip");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java
new file mode 100644
index 0000000..a6787c5
--- /dev/null
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/compression/InflaterInputStreamFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.io.compression;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.zip.InflaterInputStream;
+
+/**
+ * Creates a new instance of a certain subclass of {@link 
java.util.zip.InflaterInputStream}.
+ */
+public interface InflaterInputStreamFactory<T extends InflaterInputStream> {
+
+       /**
+        * Creates a {@link java.util.zip.InflaterInputStream} that wraps the 
given input stream.
+        * @param in is the compressed input stream
+        * @return the inflated input stream
+        */
+       T create(InputStream in) throws IOException;
+
+       /**
+        * Lists a collection of typical file extensions (e.g., "gz", "gzip") 
that are associated with the compression
+        * algorithm in the {@link java.util.zip.InflaterInputStream} {@code T}.
+        * @return a (possibly empty) collection of lower-case file extensions, 
without the period
+        */
+       Collection<String> getCommonFileExtensions();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/16ab1fd9/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
index 3749645..20b130c 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/GenericCsvInputFormatTest.java
@@ -30,6 +30,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
+import java.util.zip.DeflaterOutputStream;
+import java.util.zip.GZIPOutputStream;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -39,6 +41,7 @@ import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.types.Value;
+import org.jets3t.service.io.GZipDeflatingInputStream;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -119,6 +122,86 @@ public class GenericCsvInputFormatTest {
                        fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
                }
        }
+
+       @Test
+       public void testReadNoPosAllDeflate() throws IOException {
+               try {
+                       final String fileContent = 
"111|222|333|444|555\n666|777|888|999|000|";
+                       final FileInputSplit split = 
createTempDeflateFile(fileContent);
+
+                       final Configuration parameters = new Configuration();
+
+                       format.setFieldDelimiter("|");
+                       format.setFieldTypesGeneric(IntValue.class, 
IntValue.class, IntValue.class, IntValue.class, IntValue.class);
+
+                       format.configure(parameters);
+                       format.open(split);
+
+                       Value[] values = createIntValues(5);
+
+                       values = format.nextRecord(values);
+                       assertNotNull(values);
+                       assertEquals(111, ((IntValue) values[0]).getValue());
+                       assertEquals(222, ((IntValue) values[1]).getValue());
+                       assertEquals(333, ((IntValue) values[2]).getValue());
+                       assertEquals(444, ((IntValue) values[3]).getValue());
+                       assertEquals(555, ((IntValue) values[4]).getValue());
+
+                       values = format.nextRecord(values);
+                       assertNotNull(values);
+                       assertEquals(666, ((IntValue) values[0]).getValue());
+                       assertEquals(777, ((IntValue) values[1]).getValue());
+                       assertEquals(888, ((IntValue) values[2]).getValue());
+                       assertEquals(999, ((IntValue) values[3]).getValue());
+                       assertEquals(000, ((IntValue) values[4]).getValue());
+
+                       assertNull(format.nextRecord(values));
+                       assertTrue(format.reachedEnd());
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+               }
+       }
+
+       @Test
+       public void testReadNoPosAllGzip() throws IOException {
+               try {
+                       final String fileContent = 
"111|222|333|444|555\n666|777|888|999|000|";
+                       final FileInputSplit split = 
createTempGzipFile(fileContent);
+
+                       final Configuration parameters = new Configuration();
+
+                       format.setFieldDelimiter("|");
+                       format.setFieldTypesGeneric(IntValue.class, 
IntValue.class, IntValue.class, IntValue.class, IntValue.class);
+
+                       format.configure(parameters);
+                       format.open(split);
+
+                       Value[] values = createIntValues(5);
+
+                       values = format.nextRecord(values);
+                       assertNotNull(values);
+                       assertEquals(111, ((IntValue) values[0]).getValue());
+                       assertEquals(222, ((IntValue) values[1]).getValue());
+                       assertEquals(333, ((IntValue) values[2]).getValue());
+                       assertEquals(444, ((IntValue) values[3]).getValue());
+                       assertEquals(555, ((IntValue) values[4]).getValue());
+
+                       values = format.nextRecord(values);
+                       assertNotNull(values);
+                       assertEquals(666, ((IntValue) values[0]).getValue());
+                       assertEquals(777, ((IntValue) values[1]).getValue());
+                       assertEquals(888, ((IntValue) values[2]).getValue());
+                       assertEquals(999, ((IntValue) values[3]).getValue());
+                       assertEquals(000, ((IntValue) values[4]).getValue());
+
+                       assertNull(format.nextRecord(values));
+                       assertTrue(format.reachedEnd());
+               }
+               catch (Exception ex) {
+                       fail("Test failed due to a " + 
ex.getClass().getSimpleName() + ": " + ex.getMessage());
+               }
+       }
        
        @Test
        public void testReadNoPosFirstN() throws IOException {
@@ -584,6 +667,28 @@ public class GenericCsvInputFormatTest {
                        
                return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
        }
+
+       private FileInputSplit createTempDeflateFile(String content) throws 
IOException {
+               this.tempFile = File.createTempFile("test_contents", 
"tmp.deflate");
+               this.tempFile.deleteOnExit();
+
+               DataOutputStream dos = new DataOutputStream(new 
DeflaterOutputStream(new FileOutputStream(tempFile)));
+               dos.writeBytes(content);
+               dos.close();
+
+               return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
+       }
+
+       private FileInputSplit createTempGzipFile(String content) throws 
IOException {
+               this.tempFile = File.createTempFile("test_contents", "tmp.gz");
+               this.tempFile.deleteOnExit();
+
+               DataOutputStream dos = new DataOutputStream(new 
GZIPOutputStream(new FileOutputStream(tempFile)));
+               dos.writeBytes(content);
+               dos.close();
+
+               return new FileInputSplit(0, new 
Path(this.tempFile.toURI().toString()), 0, this.tempFile.length(), new String[] 
{"localhost"});
+       }
        
        private final Value[] createIntValues(int num) {
                Value[] v = new Value[num];

Reply via email to