pradeepkth
Thu, 05 Nov 2009 13:44:52 -0800
Author: pradeepkth Date: Thu Nov 5 21:44:24 2009 New Revision: 833193 URL: http://svn.apache.org/viewvc?rev=833193&view=rev Log: PIG-958: Splitting output data on key field (ankur via pradeepkth)
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
Modified:
hadoop/pig/trunk/CHANGES.txt
Modified: hadoop/pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=833193&r1=833192&r2=833193&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Nov 5 21:44:24 2009
@@ -26,6 +26,8 @@
IMPROVEMENTS
+PIG-958: Splitting output data on key field (ankur via pradeepkth)
+
PIG-1058: FINDBUGS: remaining "Correctness Warnings" (olgan)
PIG-1036: Fragment-replicate left outer join (ankit.modi via pradeepkth)
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java?rev=833193&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
(added)
+++
hadoop/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
Thu Nov 5 21:44:24 2009
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding
copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
"License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License
at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.
+ * See the License for the specific language governing permissions and
limitations under the License.
+ */
+package org.apache.pig.piggybank.storage;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.StoreFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.builtin.Utf8StorageConverter;
+import org.apache.pig.data.Tuple;
+import org.apache.tools.bzip2r.CBZip2OutputStream;
+
+/**
+ * The UDF is useful for splitting the output data into a bunch of directories
+ * and files dynamically based on user specified key field in the output tuple.
+ *
+ * Sample usage: <code>
+ * A = LOAD 'mydata' USING PigStorage() as (a, b, c);
+ * STORE A INTO '/my/home/output' USING MultiStorage('/my/home/output','0',
'bz2', '\\t');
+ * </code> Parameter details:- ========== <b>/my/home/output </b>(Required) :
+ * The DFS path where output directories and files will be created. <b> 0
+ * </b>(Required) : Index of field whose values should be used to create
+ * directories and files( field 'a' in this case). <b>'bz2' </b>(Optional) :
The
+ * compression type. Default is 'none'. Supported types are:- 'none', 'gz' and
+ * 'bz2' <b> '\\t' </b>(Optional) : Output field separator.
+ *
+ * Let 'a1', 'a2' be the unique values of field 'a'. Then output may look like
+ * this
+ *
+ * /my/home/output/a1/a1-0000 /my/home/output/a1/a1-0001
+ * /my/home/output/a1/a1-0002 ... /my/home/output/a2/a2-0000
+ * /my/home/output/a2/a2-0001 /my/home/output/a2/a2-0002
+ *
+ * The prefix '0000*' is the task-id of the mapper/reducer task executing this
+ * store. In case user does a GROUP BY on the field followed by MultiStorage(),
+ * then its imperative that all tuples for a particular group will go exactly
to
+ * 1 reducer. So in the above case for e.g. there will be only 1 file each
under
+ * 'a1' and 'a2' directories.
+ */
+public class MultiStorage extends Utf8StorageConverter implements StoreFunc {
+
+ // map of all (key-field-values, PigStorage) received by this store
+ private Map<String, PigStorage> storeMap;
+ private List<OutputStream> outStreamList; // list of all open streams
+ private boolean isAbsolute; // Is the user specified output path absolute
+ private String partition; // Reduce partition ID executing this store
+ private Path outputPath; // User specified output Path
+ private Path workOutputPath; // Task specific temporary output path
+ private Compression comp; // Compression type of output data.
+ private int splitFieldIndex = -1; // Index of the key field
+ private String fieldDel; // delimiter of the output record.
+ private FileSystem fs; // Output file system
+
+ // filter for removing hidden files in a listing
+ public static final PathFilter hiddenPathFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ // Compression types supported by this store
+ enum Compression {
+ none, bz2, bz, gz;
+ };
+
+ public MultiStorage(String parentPathStr, String splitFieldIndex) {
+ this(parentPathStr, splitFieldIndex, "none");
+ }
+
+ public MultiStorage(String parentPathStr, String splitFieldIndex,
+ String compression) {
+ this(parentPathStr, splitFieldIndex, compression, "\\t");
+
+ }
+
+ /**
+ * Constructor
+ *
+ * @param parentPathStr
+ * Parent output dir path
+ * @param splitFieldIndex
+ * key field index
+ * @param compression
+ * 'bz2', 'bz', 'gz' or 'none'
+ * @param fieldDel
+ * Output record field delimiter.
+ */
+ public MultiStorage(String parentPathStr, String splitFieldIndex,
+ String compression, String fieldDel) {
+ this.outputPath = new Path(parentPathStr);
+ this.splitFieldIndex = Integer.parseInt(splitFieldIndex);
+ this.fieldDel = fieldDel;
+ this.storeMap = new HashMap<String, PigStorage>();
+ this.outStreamList = new ArrayList<OutputStream>();
+ try {
+ this.comp = (compression == null) ? Compression.none : Compression
+ .valueOf(compression.toLowerCase());
+ } catch (IllegalArgumentException e) {
+ System.err.println("Exception when converting compression string: "
+ + compression + " to enum. No compression will be used");
+ this.comp = Compression.none;
+ }
+ }
+
+ /**
+ * Return the work output path suffixed with the parent output dir name.
+ *
+ * @param conf
+ * @return
+ * @throws IOException
+ */
+ private Path getWorkOutputPath(JobConf conf) throws IOException {
+ Path outPath = (conf != null) ? new Path(FileOutputFormat
+ .getWorkOutputPath(conf), this.outputPath) : this.outputPath;
+ return outPath;
+ }
+
+ /**
+ * Get the partition number of the reduce task in which it is executing.
+ *
+ * @param conf
+ * @return
+ */
+ private String getPartition(JobConf conf) {
+ int part = (conf != null) ? conf.getInt("mapred.task.partition", -1) : 0;
+ NumberFormat numberFormat = NumberFormat.getInstance();
+ numberFormat.setMinimumIntegerDigits(5);
+ numberFormat.setGroupingUsed(false);
+ return numberFormat.format(part);
+ }
+
+ /**
+ * hack to get the map/reduce task unique ID in which this is running. Also
+ * get the outputPath of the job to be used as base path where field value
+ * specific sub-directories will be created.
+ *
+ * @throws IOException
+ */
+ private void initJobSpecificParams() throws IOException {
+ this.partition = (this.partition == null) ?
getPartition(PigMapReduce.sJobConf)
+ : this.partition;
+ // workOutputPath = workOutputPath/outputPath. Later we will remove the
+ // suffix.
+ this.workOutputPath = (this.workOutputPath == null) ?
getWorkOutputPath(PigMapReduce.sJobConf)
+ : this.workOutputPath;
+ if (this.fs == null) {
+ this.fs = (PigMapReduce.sJobConf == null) ? FileSystem
+ .getLocal(new Configuration()) : FileSystem
+ .get(PigMapReduce.sJobConf);
+ }
+ }
+
+ @Override
+ public void bindTo(OutputStream os) throws IOException {
+ // Nothing to bind here as we will be writing each tuple into a split
+ // based on its schema
+ }
+
+ /**
+ * Create an appropriate output stream for the fieldValue.
+ *
+ * @param fieldValue
+ * @return
+ * @throws IOException
+ */
+ private OutputStream createOutputStream(String fieldValue) throws
IOException {
+ Path path = new Path(fieldValue, fieldValue + '-' + partition);
+ Path fieldValueBasedPath = new Path(workOutputPath, path);
+ OutputStream os = null;
+ switch (comp) {
+ case bz:
+ case bz2:
+ os = fs.create(fieldValueBasedPath.suffix(".bz2"), false);
+ os = new CBZip2OutputStream(os);
+ break;
+ case gz:
+ os = fs.create(fieldValueBasedPath.suffix(".gz"), false);
+ os = new GZIPOutputStream(os);
+ break;
+ case none:
+ os = fs.create(fieldValueBasedPath, false);
+ }
+ return os;
+ }
+
+ /**
+ * Retrieve the pig storage corresponding to the field value.
+ *
+ * @param fieldValue
+ * @return
+ * @throws IOException
+ */
+ private PigStorage getStore(String fieldValue) throws IOException {
+ PigStorage store = storeMap.get(fieldValue);
+ if (store == null) {
+ store = new PigStorage(fieldDel);
+ OutputStream os = createOutputStream(fieldValue);
+ store.bindTo(os);
+ outStreamList.add(os);
+ storeMap.put(fieldValue, store);
+ }
+ return store;
+ }
+
+ @Override
+ public void putNext(Tuple tuple) throws IOException {
+ initJobSpecificParams();
+ if (tuple.size() <= splitFieldIndex) {
+ throw new IOException("split field index:" + this.splitFieldIndex
+ + " >= tuple size:" + tuple.size());
+ }
+ Object field = null;
+ try {
+ field = tuple.get(splitFieldIndex);
+ } catch (ExecException exec) {
+ throw new IOException(exec);
+ }
+ PigStorage store = getStore(String.valueOf(field));
+ store.putNext(tuple);
+ }
+
+ /**
+ * Flush the output streams and call pigStorage.finish() for each pigStorage
+ * object. Clear the map of pigStorage objects and move the final results to
+ * the correct location from the temporary output path since multiquery
+ * implementation might ignore our results.
+ *
+ * @ throws IOException
+ */
+ @Override
+ public void finish() throws IOException {
+ Collection<PigStorage> pigStores = storeMap.values();
+ for (PigStorage store : pigStores) {
+ store.finish();
+ }
+ storeMap.clear();
+ for (OutputStream os : outStreamList) {
+ os.flush();
+ os.close();
+ }
+ outStreamList.clear();
+ // move the results here
+ if (PigMapReduce.sJobConf != null) {
+ Path rem = FileOutputFormat.getWorkOutputPath(PigMapReduce.sJobConf);
+ String pathToRemove = rem.toUri().getPath() + (!isAbsolute ? "/" : "");
+ moveResults(workOutputPath, pathToRemove);
+ }
+ }
+
+ /**
+ * Moves the files and dir under given path 'p' to the actual path. The API
+ * traverses the workOutputPath recursively and renames the files and
+ * directories by removing 'rem' from their path names
+ *
+ * @param p
+ * The
+ * @param rem
+ * @throws IOException
+ */
+ private void moveResults(Path p, String rem) throws IOException {
+ for (FileStatus fstat : fs.listStatus(p, hiddenPathFilter)) {
+ Path src = fstat.getPath();
+ Path dst = new Path(src.toUri().getPath().replace(rem, ""));
+ if (fstat.isDir()) {
+ fs.mkdirs(dst);
+ moveResults(src, rem);
+ } else {
+ fs.rename(src, dst);
+ }
+ }
+ }
+
+ // @Override
+ public Class getStorePreparationClass() throws IOException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+}
Added:
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java?rev=833193&view=auto
==============================================================================
---
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
(added)
+++
hadoop/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorage.java
Thu Nov 5 21:44:24 2009
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the
+ * NOTICE file distributed with this work for additional information regarding
copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
"License"); you may not use this file
+ * except in compliance with the License. You may obtain a copy of the License
at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied.
+ * See the License for the specific language governing permissions and
limitations under the License.
+ */
+package org.apache.pig.piggybank.test.storage;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.test.MiniCluster;
+import org.apache.pig.test.Util;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+public class TestMultiStorage extends TestCase {
+ private static final String INPUT_FILE = "MultiStorageInput.txt";
+
+ private PigServer pigServer;
+ private PigServer pigServerLocal;
+
+ private MiniCluster cluster = MiniCluster.buildCluster();
+
+ public TestMultiStorage() throws ExecException, IOException {
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServerLocal = new PigServer(ExecType.LOCAL);
+ }
+
+ public static final PathFilter hiddenPathFilter = new PathFilter() {
+ public boolean accept(Path p) {
+ String name = p.getName();
+ return !name.startsWith("_") && !name.startsWith(".");
+ }
+ };
+
+ private void createFile() throws IOException {
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("100\tapple\taaa1");
+ w.println("200\torange\tbbb1");
+ w.println("300\tstrawberry\tccc1");
+
+ w.println("101\tapple\taaa2");
+ w.println("201\torange\tbbb2");
+ w.println("301\tstrawberry\tccc2");
+
+ w.println("102\tapple\taaa3");
+ w.println("202\torange\tbbb3");
+ w.println("302\tstrawberry\tccc3");
+
+ w.close();
+ Util.deleteFile(cluster, INPUT_FILE);
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ createFile();
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ Path localOut = new Path("local-out");
+ Path dummy = new Path("dummy");
+ if (fs.exists(localOut)) {
+ fs.delete(localOut, true);
+ }
+ if (fs.exists(dummy)) {
+ fs.delete(dummy, true);
+ }
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception {
+ new File(INPUT_FILE).delete();
+ Util.deleteFile(cluster, INPUT_FILE);
+ }
+
+ enum Mode {
+ local, cluster
+ };
+
+ @Test
+ public void testMultiStorage() throws IOException {
+ final String LOAD = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);";
+ final String MULTI_STORE_CLUSTER = "STORE A INTO 'mr-out' USING "
+ + "org.apache.pig.piggybank.storage.MultiStorage('mr-out', '1');";
+ final String MULTI_STORE_LOCAL = "STORE A INTO 'dummy' USING "
+ + "org.apache.pig.piggybank.storage.MultiStorage('local-out', '1');";
+
+ System.out.print("Testing in LOCAL mode: ...");
+ //testMultiStorage(Mode.local, "local-out", LOAD, MULTI_STORE_LOCAL);
+ System.out.println("Succeeded!");
+
+ System.out.print("Testing in CLUSTER mode: ...");
+ testMultiStorage( Mode.cluster, "mr-out", LOAD, MULTI_STORE_CLUSTER);
+ System.out.println("Succeeded!");
+
+
+ }
+
+ /**
+ * The actual method that run the test in local or cluster mode.
+ *
+ * @param pigServer
+ * @param mode
+ * @param queries
+ * @throws IOException
+ */
+ private void testMultiStorage( Mode mode, String outPath,
+ String... queries) throws IOException {
+ PigServer pigServer = (Mode.local == mode) ? this.pigServerLocal :
this.pigServer;
+ pigServer.setBatchOn();
+ for (String query : queries) {
+ pigServer.registerQuery(query);
+ }
+ pigServer.executeBatch();
+ verifyResults(mode, outPath);
+ }
+
+ /**
+ * Test if records are split into directories corresponding to split field
+ * values
+ *
+ * @param mode
+ * @throws IOException
+ */
+ private void verifyResults(Mode mode, String outPath) throws IOException {
+ FileSystem fs = (Mode.local == mode ? FileSystem
+ .getLocal(new Configuration()) : cluster.getFileSystem());
+ Path output = new Path(outPath);
+ Assert.assertTrue("Output dir does not exists!", fs.exists(output)
+ && fs.isDirectory(output));
+
+ Path[] paths = FileUtil.stat2Paths(fs.listStatus(output,
hiddenPathFilter));
+ Assert.assertTrue("Split field dirs not found!", paths != null);
+
+ for (Path path : paths) {
+ String splitField = path.getName();
+ Path[] files = FileUtil.stat2Paths(fs.listStatus(path,
hiddenPathFilter));
+ Assert.assertTrue("No files found for path: " + path.toUri().getPath(),
+ files != null);
+ for (Path filePath : files) {
+ if (fs.isFile(filePath)) {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(fs
+ .open(filePath)));
+ String line = "";
+ while ((line = reader.readLine()) != null) {
+ String[] fields = line.split("\\t");
+ Assert.assertEquals(fields.length, 3);
+ Assert.assertEquals("Unexpected field value in the output record",
+ splitField, fields[1]);
+ }
+ reader.close();
+ }
+ }
+ }
+ }
+}