Repository: crunch Updated Branches: refs/heads/master 1a160b653 -> ac3863e80
Fix UTF8 encoded outputs in MemPipeline.write Project: http://git-wip-us.apache.org/repos/asf/crunch/repo Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ac3863e8 Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ac3863e8 Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ac3863e8 Branch: refs/heads/master Commit: ac3863e8019c33adae931d5d02e38e563e0b9c94 Parents: 1a160b6 Author: Josh Wills <[email protected]> Authored: Thu Feb 13 17:32:40 2014 -0800 Committer: Josh Wills <[email protected]> Committed: Fri Feb 14 15:05:29 2014 -0800 ---------------------------------------------------------------------- .../crunch/impl/mem/MemPipelineUTF8IT.java | 89 ++++++++++++++++++++ .../org/apache/crunch/impl/mem/MemPipeline.java | 14 +-- pom.xml | 10 ++- 3 files changed, 106 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/crunch/blob/ac3863e8/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java new file mode 100644 index 0000000..56b167a --- /dev/null +++ b/crunch-core/src/it/java/org/apache/crunch/impl/mem/MemPipelineUTF8IT.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.crunch.impl.mem; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.nio.charset.Charset; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import junit.framework.Assert; + +import org.apache.crunch.PCollection; +import org.apache.crunch.Pipeline; +import org.apache.crunch.Target; +import org.apache.crunch.Target.WriteMode; +import org.apache.crunch.impl.mr.MRPipeline; +import org.apache.crunch.io.text.TextFileTarget; +import org.apache.crunch.test.TemporaryPath; +import org.apache.crunch.test.TemporaryPaths; +import org.junit.Rule; +import org.junit.Test; + +public class MemPipelineUTF8IT { + + @Rule + public TemporaryPath baseTmpDir = TemporaryPaths.create(); + + private static void writeFile(String text, String filename) throws IOException { + Files.write(text, new File(filename), Charsets.UTF_8); + } + + @Test + public void testText() throws Exception { + + final String infilename = baseTmpDir.getFileName("input"); + final String memOutFilename = baseTmpDir.getFileName("memPipelineOut"); + final String mrOutFilename = baseTmpDir.getFileName("mrPipelineOut"); + final String expected = "súper"; + + new File(infilename).getParentFile().mkdirs(); + + writeFile(expected, infilename); + + Pipeline memPipeline = MemPipeline.getInstance(); + PCollection<String> memPColl = memPipeline.readTextFile(infilename); + Target memTarget = new TextFileTarget(memOutFilename); + memPipeline.write(memPColl, memTarget, WriteMode.OVERWRITE); + memPipeline.run(); + File outDir = new File(memOutFilename); + File actualMemOut = null; + for (File f : outDir.listFiles()) { + String name = f.getName(); + if (name.contains("out") && name.endsWith(".txt")) { + actualMemOut = f; + break; + } + } + String actualMemText = Files.readFirstLine(actualMemOut, Charsets.UTF_8); + + Pipeline mrPipeline = new MRPipeline(getClass()); + PCollection<String> mrPColl = mrPipeline.readTextFile(infilename); + Target mrTarget = new TextFileTarget(mrOutFilename); + mrPipeline.write(mrPColl, mrTarget, WriteMode.OVERWRITE); + mrPipeline.run(); + String actualMrText = Files.readFirstLine(new File(mrOutFilename + "/part-m-00000"), Charsets.UTF_8); + + Assert.assertEquals("MR file mismatch", expected, actualMrText); + Assert.assertEquals("Mem file mismatch", expected, actualMemText); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/crunch/blob/ac3863e8/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java ---------------------------------------------------------------------- diff --git a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java index 7ef9f4f..b3e9c54 100644 --- a/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java +++ b/crunch-core/src/main/java/org/apache/crunch/impl/mem/MemPipeline.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import com.google.common.base.Charsets; import org.apache.avro.file.DataFileWriter; import org.apache.avro.io.DatumWriter; import org.apache.commons.logging.Log; @@ -211,17 +212,20 @@ public class MemPipeline implements Pipeline { LOG.warn("Defaulting to write to a text file from MemPipeline"); Path outputPath = new Path(path, "out" + outputIndex + ".txt"); FSDataOutputStream os = fs.create(outputPath); + byte[] newLine = "\r\n".getBytes(Charsets.UTF_8); if (collection instanceof PTable) { + byte[] tab = "\t".getBytes(Charsets.UTF_8); for (Object o : collection.materialize()) { Pair p = (Pair) o; - os.writeBytes(p.first().toString()); - os.writeBytes("\t"); - os.writeBytes(p.second().toString()); - os.writeBytes("\r\n"); + os.write(p.first().toString().getBytes(Charsets.UTF_8)); + os.write(tab); + os.write(p.second().toString().getBytes(Charsets.UTF_8)); + os.write(newLine); } } else { for (Object o : collection.materialize()) { - os.writeBytes(o.toString() + "\r\n"); + os.write(o.toString().getBytes(Charsets.UTF_8)); + os.write(newLine); } } os.close(); http://git-wip-us.apache.org/repos/asf/crunch/blob/ac3863e8/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 77def0c..8812f5e 100644 --- a/pom.xml +++ b/pom.xml @@ -643,7 +643,10 @@ under the License. <artifactId>maven-surefire-plugin</artifactId> <version>2.12</version> <configuration> - <argLine>-Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G</argLine> + <encoding>UTF-8</encoding> + <inputEncoding>UTF-8</inputEncoding> + <outputEncoding>UTF-8</outputEncoding> + <argLine>-Xmx2G -XX:PermSize=512m -XX:MaxPermSize=1G -Dfile.encoding=UTF-8</argLine> </configuration> </plugin> <plugin> @@ -827,7 +830,10 @@ under the License. <artifactId>maven-failsafe-plugin</artifactId> <version>2.12</version> <configuration> - <argLine>-Xmx1G</argLine> + <encoding>UTF-8</encoding> + <inputEncoding>UTF-8</inputEncoding> + <outputEncoding>UTF-8</outputEncoding> + <argLine>-Xmx1G -Dfile.encoding=UTF-8</argLine> <testSourceDirectory>${basedir}/src/it/java</testSourceDirectory> </configuration> <executions>
