Author: daijy
Date: Thu Dec  1 21:19:02 2016
New Revision: 1772278

URL: http://svn.apache.org/viewvc?rev=1772278&view=rev
Log:
PIG-4901: To use Multistorage for each Group

Modified:
    pig/trunk/CHANGES.txt
    
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
    
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java

Modified: pig/trunk/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1772278&r1=1772277&r2=1772278&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec  1 21:19:02 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4901: To use Multistorage for each Group (szita via daijy)
+
 PIG-5025: Fix flaky test failures in TestLoad.java (szita via rohini)
 
 PIG-4939: QueryParserUtils.setHdfsServers(QueryParserUtils.java:104) should 
not be called for non-dfs

Modified: 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java?rev=1772278&r1=1772277&r2=1772278&view=diff
==============================================================================
--- 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
 (original)
+++ 
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
 Thu Dec  1 21:19:02 2016
@@ -16,7 +16,9 @@ import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.text.NumberFormat;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
@@ -42,6 +44,9 @@ import org.apache.pig.backend.executione
 import 
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.StorageUtil;
+import org.apache.xml.utils.StringBufferPool;
+
+import com.google.common.base.Strings;
 
 /**
  * The UDF is useful for splitting the output data into a bunch of directories
@@ -73,13 +78,21 @@ import org.apache.pig.impl.util.StorageU
  * If the output is compressed,then the sub directories and the output files 
will
  * be having the extension. Say for example in the above case if bz2 is used 
one file 
  * will look like ;/my/home/output.bz2/a1.bz2/a1-0000.bz2
+ *
+ * Key field can also be a comma separated list of indices e.g. '0,1' - in 
this case
+ * storage will be multi-level:
+ * /my/home/output/a1/b1/a1-b1-0000
+ * /my/home/output/a1/b2/a1-b2-0000
+ * There is also an option to leave key values out of storage, see 
isRemoveKeys.
  */
 public class MultiStorage extends StoreFunc {
 
+  private static final String KEYFIELD_DELIMETER = ",";
   private Path outputPath; // User specified output Path
-  private int splitFieldIndex = -1; // Index of the key field
+  private final List<Integer> splitFieldIndices= new ArrayList<Integer>(); // 
Indices of the key fields
   private String fieldDel; // delimiter of the output record.
   private Compression comp; // Compression type of output data.
+  private boolean isRemoveKeys = false;
   
   // Compression types supported by this store
   enum Compression {
@@ -95,9 +108,14 @@ public class MultiStorage extends StoreF
     this(parentPathStr, splitFieldIndex, compression, "\\t");
   }
 
+  public MultiStorage(String parentPathStr, String splitFieldIndex,
+      String compression, String fieldDel) {
+    this(parentPathStr, splitFieldIndex, compression, fieldDel, "false");
+  }
+
   /**
    * Constructor
-   * 
+   *
    * @param parentPathStr
    *          Parent output dir path (this will be specified in store 
statement,
    *            so MultiStorage don't use this parameter in reality. However, 
we don't
@@ -108,18 +126,26 @@ public class MultiStorage extends StoreF
    *          'bz2', 'bz', 'gz' or 'none'
    * @param fieldDel
    *          Output record field delimiter.
+   * @param isRemoveKeys
+   *          Removes key columns from result during write.
    */
   public MultiStorage(String parentPathStr, String splitFieldIndex,
-      String compression, String fieldDel) {
+                      String compression, String fieldDel, String 
isRemoveKeys) {
+    this.isRemoveKeys = Boolean.parseBoolean(isRemoveKeys);
     this.outputPath = new Path(parentPathStr);
-    this.splitFieldIndex = Integer.parseInt(splitFieldIndex);
+
+    String[] splitFieldIndices = splitFieldIndex.split(KEYFIELD_DELIMETER);
+    for (String splitFieldIndexString : splitFieldIndices){
+      this.splitFieldIndices.add(Integer.parseInt(splitFieldIndexString));
+    }
+
     this.fieldDel = fieldDel;
     try {
       this.comp = (compression == null) ? Compression.none : Compression
-        .valueOf(compression.toLowerCase());
+              .valueOf(compression.toLowerCase());
     } catch (IllegalArgumentException e) {
       System.err.println("Exception when converting compression string: "
-          + compression + " to enum. No compression will be used");
+              + compression + " to enum. No compression will be used");
       this.comp = Compression.none;
     }
   }
@@ -127,22 +153,26 @@ public class MultiStorage extends StoreF
   //--------------------------------------------------------------------------
   // Implementation of StoreFunc
 
-  private RecordWriter<String, Tuple> writer;
+  private RecordWriter<List<String>, Tuple> writer;
   
   @Override
   public void putNext(Tuple tuple) throws IOException {
-    if (tuple.size() <= splitFieldIndex) {
-      throw new IOException("split field index:" + this.splitFieldIndex
-          + " >= tuple size:" + tuple.size());
+    for (int splitFieldIndex : this.splitFieldIndices) {
+      if (tuple.size() <= splitFieldIndex) {
+        throw new IOException("split field index:" + splitFieldIndex
+                + " >= tuple size:" + tuple.size());
+      }
     }
-    Object field = null;
-    try {
-      field = tuple.get(splitFieldIndex);
-    } catch (ExecException exec) {
-      throw new IOException(exec);
+    List<String> fields = new ArrayList<String>();
+    for (int splitFieldIndex : this.splitFieldIndices){
+      try {
+        fields.add(String.valueOf(tuple.get(splitFieldIndex)));
+      } catch (ExecException exec) {
+        throw new IOException(exec);
+      }
     }
     try {
-      writer.write(String.valueOf(field), tuple);
+      writer.write(fields, tuple);
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
@@ -153,6 +183,9 @@ public class MultiStorage extends StoreF
   public OutputFormat getOutputFormat() throws IOException {
       MultiStorageOutputFormat format = new MultiStorageOutputFormat();
       format.setKeyValueSeparator(fieldDel);
+      if (this.isRemoveKeys){
+        format.setSkipIndices(this.splitFieldIndices);
+      }
       return format;
   }
     
@@ -184,22 +217,23 @@ public class MultiStorage extends StoreF
   // Implementation of OutputFormat
   
   public static class MultiStorageOutputFormat extends
-  TextOutputFormat<String, Tuple> {
+  TextOutputFormat<List<String>, Tuple> {
 
     private String keyValueSeparator = "\\t";
     private byte fieldDel = '\t';
-  
+    private List<Integer> skipIndices = null;
+
     @Override
-    public RecordWriter<String, Tuple> 
+    public RecordWriter<List<String>, Tuple>
     getRecordWriter(TaskAttemptContext context
                 ) throws IOException, InterruptedException {
     
       final TaskAttemptContext ctx = context;
         
-      return new RecordWriter<String, Tuple>() {
+      return new RecordWriter<List<String>, Tuple>() {
 
-        private Map<String, MyLineRecordWriter> storeMap = 
-              new HashMap<String, MyLineRecordWriter>();
+        private Map<List<String>, MyLineRecordWriter> storeMap =
+              new HashMap<List<String>, MyLineRecordWriter>();
           
         private static final int BUFFER_SIZE = 1024;
           
@@ -207,7 +241,7 @@ public class MultiStorage extends StoreF
               new ByteArrayOutputStream(BUFFER_SIZE);
                            
         @Override
-        public void write(String key, Tuple val) throws IOException {          
      
+        public void write(List<String> key, Tuple val) throws IOException {
           int sz = val.size();
           for (int i = 0; i < sz; i++) {
             Object field;
@@ -217,9 +251,13 @@ public class MultiStorage extends StoreF
               throw ee;
             }
 
-            StorageUtil.putField(mOut, field);
+            boolean skipCurrentField = skipIndices != null && 
skipIndices.contains(i);
+
+            if (!skipCurrentField) {
+              StorageUtil.putField(mOut, field);
+            }
 
-            if (i != sz - 1) {
+            if (i != sz - 1 && !skipCurrentField) {
               mOut.write(fieldDel);
             }
           }
@@ -236,17 +274,17 @@ public class MultiStorage extends StoreF
           }
         }
       
-        private MyLineRecordWriter getStore(String fieldValue) throws 
IOException {
-          MyLineRecordWriter store = storeMap.get(fieldValue);
+        private MyLineRecordWriter getStore(List<String> fieldValues) throws 
IOException {
+          MyLineRecordWriter store = storeMap.get(fieldValues);
           if (store == null) {                  
-            DataOutputStream os = createOutputStream(fieldValue);
+            DataOutputStream os = createOutputStream(fieldValues);
             store = new MyLineRecordWriter(os, keyValueSeparator);
-            storeMap.put(fieldValue, store);
+            storeMap.put(fieldValues, store);
           }
           return store;
         }
           
-        private DataOutputStream createOutputStream(String fieldValue) throws 
IOException {
+        private DataOutputStream createOutputStream(List<String> fieldValues) 
throws IOException {
           Configuration conf = ctx.getConfiguration();
           TaskID taskId = ctx.getTaskAttemptID().getTaskID();
           
@@ -264,7 +302,21 @@ public class MultiStorage extends StoreF
           NumberFormat nf = NumberFormat.getInstance();
           nf.setMinimumIntegerDigits(4);
 
-          Path path = new Path(fieldValue+extension, fieldValue + '-'
+          StringBuffer pathStringBuffer = new StringBuffer();
+          for (String fieldValue : fieldValues){
+            String safeFieldValue = fieldValue.replaceAll("\\/","-");
+            pathStringBuffer.append(safeFieldValue);
+            pathStringBuffer.append("/");
+          }
+          pathStringBuffer.deleteCharAt(pathStringBuffer.length()-1);
+          String pathString = pathStringBuffer.toString();
+          String idString = pathString.replaceAll("\\/","-");
+
+          if (!Strings.isNullOrEmpty(extension)){
+            pathString = pathString.replaceAll("\\/",extension+"\\/");
+          }
+
+          Path path = new Path(pathString+extension, idString + '-'
                 + nf.format(taskId.getId())+extension);
           Path workOutputPath = 
((FileOutputCommitter)getOutputCommitter(ctx)).getWorkPath();
           Path file = new Path(workOutputPath, path);
@@ -284,8 +336,12 @@ public class MultiStorage extends StoreF
       keyValueSeparator = sep;
       fieldDel = StorageUtil.parseFieldDel(keyValueSeparator);  
     }
-  
-  //------------------------------------------------------------------------
+
+    public void setSkipIndices(List<Integer> skipIndices) {
+      this.skipIndices = skipIndices;
+    }
+
+    //------------------------------------------------------------------------
   //
   
     protected static class MyLineRecordWriter

Modified: 
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java
URL: 
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java?rev=1772278&r1=1772277&r2=1772278&view=diff
==============================================================================
--- 
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java
 (original)
+++ 
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java
 Thu Dec  1 21:19:02 2016
@@ -21,11 +21,14 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 import junit.framework.TestCase;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.BZip2Codec;
@@ -37,6 +40,10 @@ import org.apache.pig.backend.executione
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.test.Util;
 
+import com.google.common.collect.Sets;
+
+import org.junit.Assert;
+
 public class TestMultiStorageCompression extends TestCase {
 
    private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
@@ -59,8 +66,8 @@ public class TestMultiStorageCompression
       filesToDelete.add(outputPath);
 
       try {
-         runQuery(outputPath, type);
-         verifyResults(type, filesToDelete, outputPath);
+         runQuery(outputPath, "0", type);
+         verifyResults(type, outputPath);
       } finally {
          cleanUpDirs(filesToDelete);
       }
@@ -77,22 +84,22 @@ public class TestMultiStorageCompression
       filesToDelete.add(outputPath);
 
       try {
-         runQuery(outputPath, type);
-         verifyResults(type, filesToDelete, outputPath);
+         runQuery(outputPath, "0", type);
+         verifyResults(type, outputPath);
       } finally {
          cleanUpDirs(filesToDelete);
       }
    }
 
-   private void cleanUpDirs(List<String> filesToDelete) {
+   private void cleanUpDirs(List<String> filesToDelete) throws IOException {
       // Delete files recursively
       Collections.reverse(filesToDelete);
       for (String string : filesToDelete)
-         new File(string).delete();
+         FileUtils.deleteDirectory(new File(string));
    }
 
 
-   private void verifyResults(String type, List<String> filesToDelete,
+   private void verifyResults(String type,
          String outputPath) throws IOException, FileNotFoundException {
       // Verify the output
       File outputDir = new File(outputPath);
@@ -114,12 +121,10 @@ public class TestMultiStorageCompression
              continue;
          String topFolder = outputPath + File.separator + indexFolder;
          File indexFolderFile = new File(topFolder);
-         filesToDelete.add(topFolder);
          String[] list = indexFolderFile.list();
          for (String outputFile : list) {
 
             String file = topFolder + File.separator + outputFile;
-            filesToDelete.add(file);
 
             // Skip off any file starting with .
             if (outputFile.startsWith("."))
@@ -159,7 +164,7 @@ public class TestMultiStorageCompression
       }
    }
 
-   private void runQuery(String outputPath, String compressionType)
+   private void runQuery(String outputPath, String keyColIndices, String 
compressionType)
          throws Exception, ExecException, IOException, FrontendException {
 
       // create a data file
@@ -172,7 +177,7 @@ public class TestMultiStorageCompression
 
       String query2 = "STORE A INTO '" + Util.encodeEscape(outputPath)
             + "' USING org.apache.pig.piggybank.storage.MultiStorage" + "('"
-            + Util.encodeEscape(outputPath) + "','0', '" + compressionType + 
"', '\\t');";
+            + Util.encodeEscape(outputPath) + "','"+keyColIndices+"', '" + 
compressionType + "', '\\t');";
 
       // Run Pig
       pig.setBatchOn();
@@ -182,5 +187,32 @@ public class TestMultiStorageCompression
       pig.executeBatch();
    }
 
+   public void testMultiStorageShouldSupportMultiLevelAndGz() throws Exception 
{
+      String type = "gz";
+      String outputDir = "output001.multi." + type;
+      List<String> filesToDelete = new ArrayList<String>();
+
+      String tmpDir = System.getProperty("java.io.tmpdir");
+      String outputPath = tmpDir + File.separator + outputDir;
+
+      filesToDelete.add(outputPath);
+      try {
+         runQuery(outputPath, "1,0", type);
+         Collection<File> fileList = FileUtils.listFiles(new 
File(outputPath),null,true);
+         Set<String> expectedPaths = Sets.newHashSet( 
"output001.multi.gz/a.gz/f1.gz/a-f1-0,000.gz",
+                                                      
"output001.multi.gz/b.gz/f2.gz/b-f2-0,000.gz",
+                                                      
"output001.multi.gz/c.gz/f3.gz/c-f3-0,000.gz",
+                                                      
"output001.multi.gz/d.gz/f4.gz/d-f4-0,000.gz");
+         for (File file : fileList){
+            String foundPath = 
file.getAbsolutePath().substring(file.getAbsolutePath().indexOf(outputDir));
+            if (expectedPaths.contains(foundPath)){
+               expectedPaths.remove(foundPath);
+            }
+         }
+         Assert.assertTrue(expectedPaths.isEmpty());
+      } finally {
+         cleanUpDirs(filesToDelete);
+      }
+   }
 
 }


Reply via email to