Author: daijy Date: Thu Dec 1 21:19:02 2016 New Revision: 1772278 URL: http://svn.apache.org/viewvc?rev=1772278&view=rev Log: PIG-4901: To use Multistorage for each Group
Modified: pig/trunk/CHANGES.txt pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1772278&r1=1772277&r2=1772278&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Thu Dec 1 21:19:02 2016 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES Â IMPROVEMENTS +PIG-4901: To use Multistorage for each Group (szita via daijy) + PIG-5025: Fix flaky test failures in TestLoad.java (szita via rohini) PIG-4939: QueryParserUtils.setHdfsServers(QueryParserUtils.java:104) should not be called for non-dfs Modified: pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java?rev=1772278&r1=1772277&r2=1772278&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java (original) +++ pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java Thu Dec 1 21:19:02 2016 @@ -16,7 +16,9 @@ import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.IOException; import java.text.NumberFormat; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.apache.hadoop.conf.Configuration; @@ -42,6 +44,9 @@ import org.apache.pig.backend.executione import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration; import org.apache.pig.data.Tuple; import org.apache.pig.impl.util.StorageUtil; +import org.apache.xml.utils.StringBufferPool; + +import com.google.common.base.Strings; /** * The UDF is useful for splitting the output data into a bunch of directories @@ -73,13 +78,21 @@ import org.apache.pig.impl.util.StorageU * If the output is compressed,then the sub directories and the output files will * be having the extension. Say for example in the above case if bz2 is used one file * will look like ;/my/home/output.bz2/a1.bz2/a1-0000.bz2 + * + * Key field can also be a comma separated list of indices e.g. '0,1' - in this case + * storage will be multi-level: + * /my/home/output/a1/b1/a1-b1-0000 + * /my/home/output/a1/b2/a1-b2-0000 + * There is also an option to leave key values out of storage, see isRemoveKeys. */ public class MultiStorage extends StoreFunc { + private static final String KEYFIELD_DELIMETER = ","; private Path outputPath; // User specified output Path - private int splitFieldIndex = -1; // Index of the key field + private final List<Integer> splitFieldIndices= new ArrayList<Integer>(); // Indices of the key fields private String fieldDel; // delimiter of the output record. private Compression comp; // Compression type of output data. + private boolean isRemoveKeys = false; // Compression types supported by this store enum Compression { @@ -95,9 +108,14 @@ public class MultiStorage extends StoreF this(parentPathStr, splitFieldIndex, compression, "\\t"); } + public MultiStorage(String parentPathStr, String splitFieldIndex, + String compression, String fieldDel) { + this(parentPathStr, splitFieldIndex, compression, fieldDel, "false"); + } + /** * Constructor - * + * * @param parentPathStr * Parent output dir path (this will be specified in store statement, * so MultiStorage don't use this parameter in reality. However, we don't @@ -108,18 +126,26 @@ public class MultiStorage extends StoreF * 'bz2', 'bz', 'gz' or 'none' * @param fieldDel * Output record field delimiter. + * @param isRemoveKeys + * Removes key columns from result during write. */ public MultiStorage(String parentPathStr, String splitFieldIndex, - String compression, String fieldDel) { + String compression, String fieldDel, String isRemoveKeys) { + this.isRemoveKeys = Boolean.parseBoolean(isRemoveKeys); this.outputPath = new Path(parentPathStr); - this.splitFieldIndex = Integer.parseInt(splitFieldIndex); + + String[] splitFieldIndices = splitFieldIndex.split(KEYFIELD_DELIMETER); + for (String splitFieldIndexString : splitFieldIndices){ + this.splitFieldIndices.add(Integer.parseInt(splitFieldIndexString)); + } + this.fieldDel = fieldDel; try { this.comp = (compression == null) ? Compression.none : Compression - .valueOf(compression.toLowerCase()); + .valueOf(compression.toLowerCase()); } catch (IllegalArgumentException e) { System.err.println("Exception when converting compression string: " - + compression + " to enum. No compression will be used"); + + compression + " to enum. No compression will be used"); this.comp = Compression.none; } } @@ -127,22 +153,26 @@ public class MultiStorage extends StoreF //-------------------------------------------------------------------------- // Implementation of StoreFunc - private RecordWriter<String, Tuple> writer; + private RecordWriter<List<String>, Tuple> writer; @Override public void putNext(Tuple tuple) throws IOException { - if (tuple.size() <= splitFieldIndex) { - throw new IOException("split field index:" + this.splitFieldIndex - + " >= tuple size:" + tuple.size()); + for (int splitFieldIndex : this.splitFieldIndices) { + if (tuple.size() <= splitFieldIndex) { + throw new IOException("split field index:" + splitFieldIndex + + " >= tuple size:" + tuple.size()); + } } - Object field = null; - try { - field = tuple.get(splitFieldIndex); - } catch (ExecException exec) { - throw new IOException(exec); + List<String> fields = new ArrayList<String>(); + for (int splitFieldIndex : this.splitFieldIndices){ + try { + fields.add(String.valueOf(tuple.get(splitFieldIndex))); + } catch (ExecException exec) { + throw new IOException(exec); + } } try { - writer.write(String.valueOf(field), tuple); + writer.write(fields, tuple); } catch (InterruptedException e) { throw new IOException(e); } @@ -153,6 +183,9 @@ public class MultiStorage extends StoreF public OutputFormat getOutputFormat() throws IOException { MultiStorageOutputFormat format = new MultiStorageOutputFormat(); format.setKeyValueSeparator(fieldDel); + if (this.isRemoveKeys){ + format.setSkipIndices(this.splitFieldIndices); + } return format; } @@ -184,22 +217,23 @@ public class MultiStorage extends StoreF // Implementation of OutputFormat public static class MultiStorageOutputFormat extends - TextOutputFormat<String, Tuple> { + TextOutputFormat<List<String>, Tuple> { private String keyValueSeparator = "\\t"; private byte fieldDel = '\t'; - + private List<Integer> skipIndices = null; + @Override - public RecordWriter<String, Tuple> + public RecordWriter<List<String>, Tuple> getRecordWriter(TaskAttemptContext context ) throws IOException, InterruptedException { final TaskAttemptContext ctx = context; - return new RecordWriter<String, Tuple>() { + return new RecordWriter<List<String>, Tuple>() { - private Map<String, MyLineRecordWriter> storeMap = - new HashMap<String, MyLineRecordWriter>(); + private Map<List<String>, MyLineRecordWriter> storeMap = + new HashMap<List<String>, MyLineRecordWriter>(); private static final int BUFFER_SIZE = 1024; @@ -207,7 +241,7 @@ public class MultiStorage extends StoreF new ByteArrayOutputStream(BUFFER_SIZE); @Override - public void write(String key, Tuple val) throws IOException { + public void write(List<String> key, Tuple val) throws IOException { int sz = val.size(); for (int i = 0; i < sz; i++) { Object field; @@ -217,9 +251,13 @@ public class MultiStorage extends StoreF throw ee; } - StorageUtil.putField(mOut, field); + boolean skipCurrentField = skipIndices != null && skipIndices.contains(i); + + if (!skipCurrentField) { + StorageUtil.putField(mOut, field); + } - if (i != sz - 1) { + if (i != sz - 1 && !skipCurrentField) { mOut.write(fieldDel); } } @@ -236,17 +274,17 @@ public class MultiStorage extends StoreF } } - private MyLineRecordWriter getStore(String fieldValue) throws IOException { - MyLineRecordWriter store = storeMap.get(fieldValue); + private MyLineRecordWriter getStore(List<String> fieldValues) throws IOException { + MyLineRecordWriter store = storeMap.get(fieldValues); if (store == null) { - DataOutputStream os = createOutputStream(fieldValue); + DataOutputStream os = createOutputStream(fieldValues); store = new MyLineRecordWriter(os, keyValueSeparator); - storeMap.put(fieldValue, store); + storeMap.put(fieldValues, store); } return store; } - private DataOutputStream createOutputStream(String fieldValue) throws IOException { + private DataOutputStream createOutputStream(List<String> fieldValues) throws IOException { Configuration conf = ctx.getConfiguration(); TaskID taskId = ctx.getTaskAttemptID().getTaskID(); @@ -264,7 +302,21 @@ public class MultiStorage extends StoreF NumberFormat nf = NumberFormat.getInstance(); nf.setMinimumIntegerDigits(4); - Path path = new Path(fieldValue+extension, fieldValue + '-' + StringBuffer pathStringBuffer = new StringBuffer(); + for (String fieldValue : fieldValues){ + String safeFieldValue = fieldValue.replaceAll("\\/","-"); + pathStringBuffer.append(safeFieldValue); + pathStringBuffer.append("/"); + } + pathStringBuffer.deleteCharAt(pathStringBuffer.length()-1); + String pathString = pathStringBuffer.toString(); + String idString = pathString.replaceAll("\\/","-"); + + if (!Strings.isNullOrEmpty(extension)){ + pathString = pathString.replaceAll("\\/",extension+"\\/"); + } + + Path path = new Path(pathString+extension, idString + '-' + nf.format(taskId.getId())+extension); Path workOutputPath = ((FileOutputCommitter)getOutputCommitter(ctx)).getWorkPath(); Path file = new Path(workOutputPath, path); @@ -284,8 +336,12 @@ public class MultiStorage extends StoreF keyValueSeparator = sep; fieldDel = StorageUtil.parseFieldDel(keyValueSeparator); } - - //------------------------------------------------------------------------ + + public void setSkipIndices(List<Integer> skipIndices) { + this.skipIndices = skipIndices; + } + + //------------------------------------------------------------------------ // protected static class MyLineRecordWriter Modified: pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java URL: http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java?rev=1772278&r1=1772277&r2=1772278&view=diff ============================================================================== --- pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java (original) +++ pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java Thu Dec 1 21:19:02 2016 @@ -21,11 +21,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Set; import junit.framework.TestCase; +import org.apache.commons.io.FileUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.compress.BZip2Codec; @@ -37,6 +40,10 @@ import org.apache.pig.backend.executione import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.test.Util; +import com.google.common.collect.Sets; + +import org.junit.Assert; + public class TestMultiStorageCompression extends TestCase { private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)"; @@ -59,8 +66,8 @@ public class TestMultiStorageCompression filesToDelete.add(outputPath); try { - runQuery(outputPath, type); - verifyResults(type, filesToDelete, outputPath); + runQuery(outputPath, "0", type); + verifyResults(type, outputPath); } finally { cleanUpDirs(filesToDelete); } @@ -77,22 +84,22 @@ public class TestMultiStorageCompression filesToDelete.add(outputPath); try { - runQuery(outputPath, type); - verifyResults(type, filesToDelete, outputPath); + runQuery(outputPath, "0", type); + verifyResults(type, outputPath); } finally { cleanUpDirs(filesToDelete); } } - private void cleanUpDirs(List<String> filesToDelete) { + private void cleanUpDirs(List<String> filesToDelete) throws IOException { // Delete files recursively Collections.reverse(filesToDelete); for (String string : filesToDelete) - new File(string).delete(); + FileUtils.deleteDirectory(new File(string)); } - private void verifyResults(String type, List<String> filesToDelete, + private void verifyResults(String type, String outputPath) throws IOException, FileNotFoundException { // Verify the output File outputDir = new File(outputPath); @@ -114,12 +121,10 @@ public class TestMultiStorageCompression continue; String topFolder = outputPath + File.separator + indexFolder; File indexFolderFile = new File(topFolder); - filesToDelete.add(topFolder); String[] list = indexFolderFile.list(); for (String outputFile : list) { String file = topFolder + File.separator + outputFile; - filesToDelete.add(file); // Skip off any file starting with . if (outputFile.startsWith(".")) @@ -159,7 +164,7 @@ public class TestMultiStorageCompression } } - private void runQuery(String outputPath, String compressionType) + private void runQuery(String outputPath, String keyColIndices, String compressionType) throws Exception, ExecException, IOException, FrontendException { // create a data file @@ -172,7 +177,7 @@ public class TestMultiStorageCompression String query2 = "STORE A INTO '" + Util.encodeEscape(outputPath) + "' USING org.apache.pig.piggybank.storage.MultiStorage" + "('" - + Util.encodeEscape(outputPath) + "','0', '" + compressionType + "', '\\t');"; + + Util.encodeEscape(outputPath) + "','"+keyColIndices+"', '" + compressionType + "', '\\t');"; // Run Pig pig.setBatchOn(); @@ -182,5 +187,32 @@ public class TestMultiStorageCompression pig.executeBatch(); } + public void testMultiStorageShouldSupportMultiLevelAndGz() throws Exception { + String type = "gz"; + String outputDir = "output001.multi." + type; + List<String> filesToDelete = new ArrayList<String>(); + + String tmpDir = System.getProperty("java.io.tmpdir"); + String outputPath = tmpDir + File.separator + outputDir; + + filesToDelete.add(outputPath); + try { + runQuery(outputPath, "1,0", type); + Collection<File> fileList = FileUtils.listFiles(new File(outputPath),null,true); + Set<String> expectedPaths = Sets.newHashSet( "output001.multi.gz/a.gz/f1.gz/a-f1-0,000.gz", + "output001.multi.gz/b.gz/f2.gz/b-f2-0,000.gz", + "output001.multi.gz/c.gz/f3.gz/c-f3-0,000.gz", + "output001.multi.gz/d.gz/f4.gz/d-f4-0,000.gz"); + for (File file : fileList){ + String foundPath = file.getAbsolutePath().substring(file.getAbsolutePath().indexOf(outputDir)); + if (expectedPaths.contains(foundPath)){ + expectedPaths.remove(foundPath); + } + } + Assert.assertTrue(expectedPaths.isEmpty()); + } finally { + cleanUpDirs(filesToDelete); + } + } }