Author: daijy
Date: Thu Dec 1 21:19:02 2016
New Revision: 1772278
URL: http://svn.apache.org/viewvc?rev=1772278&view=rev
Log:
PIG-4901: To use Multistorage for each Group
Modified:
pig/trunk/CHANGES.txt
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1772278&r1=1772277&r2=1772278&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Dec 1 21:19:02 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
Â
IMPROVEMENTS
+PIG-4901: To use Multistorage for each Group (szita via daijy)
+
PIG-5025: Fix flaky test failures in TestLoad.java (szita via rohini)
PIG-4939: QueryParserUtils.setHdfsServers(QueryParserUtils.java:104) should
not be called for non-dfs
Modified:
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
URL:
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java?rev=1772278&r1=1772277&r2=1772278&view=diff
==============================================================================
---
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
(original)
+++
pig/trunk/contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/MultiStorage.java
Thu Dec 1 21:19:02 2016
@@ -16,7 +16,9 @@ import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.text.NumberFormat;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
@@ -42,6 +44,9 @@ import org.apache.pig.backend.executione
import
org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.StorageUtil;
+import org.apache.xml.utils.StringBufferPool;
+
+import com.google.common.base.Strings;
/**
* The UDF is useful for splitting the output data into a bunch of directories
@@ -73,13 +78,21 @@ import org.apache.pig.impl.util.StorageU
* If the output is compressed,then the sub directories and the output files
will
* be having the extension. Say for example in the above case if bz2 is used
one file
* will look like ;/my/home/output.bz2/a1.bz2/a1-0000.bz2
+ *
+ * Key field can also be a comma separated list of indices e.g. '0,1' - in
this case
+ * storage will be multi-level:
+ * /my/home/output/a1/b1/a1-b1-0000
+ * /my/home/output/a1/b2/a1-b2-0000
+ * There is also an option to leave key values out of storage, see
isRemoveKeys.
*/
public class MultiStorage extends StoreFunc {
+ private static final String KEYFIELD_DELIMETER = ",";
private Path outputPath; // User specified output Path
- private int splitFieldIndex = -1; // Index of the key field
+ private final List<Integer> splitFieldIndices= new ArrayList<Integer>(); //
Indices of the key fields
private String fieldDel; // delimiter of the output record.
private Compression comp; // Compression type of output data.
+ private boolean isRemoveKeys = false;
// Compression types supported by this store
enum Compression {
@@ -95,9 +108,14 @@ public class MultiStorage extends StoreF
this(parentPathStr, splitFieldIndex, compression, "\\t");
}
+ public MultiStorage(String parentPathStr, String splitFieldIndex,
+ String compression, String fieldDel) {
+ this(parentPathStr, splitFieldIndex, compression, fieldDel, "false");
+ }
+
/**
* Constructor
- *
+ *
* @param parentPathStr
* Parent output dir path (this will be specified in store
statement,
* so MultiStorage don't use this parameter in reality. However,
we don't
@@ -108,18 +126,26 @@ public class MultiStorage extends StoreF
* 'bz2', 'bz', 'gz' or 'none'
* @param fieldDel
* Output record field delimiter.
+ * @param isRemoveKeys
+ * Removes key columns from result during write.
*/
public MultiStorage(String parentPathStr, String splitFieldIndex,
- String compression, String fieldDel) {
+ String compression, String fieldDel, String
isRemoveKeys) {
+ this.isRemoveKeys = Boolean.parseBoolean(isRemoveKeys);
this.outputPath = new Path(parentPathStr);
- this.splitFieldIndex = Integer.parseInt(splitFieldIndex);
+
+ String[] splitFieldIndices = splitFieldIndex.split(KEYFIELD_DELIMETER);
+ for (String splitFieldIndexString : splitFieldIndices){
+ this.splitFieldIndices.add(Integer.parseInt(splitFieldIndexString));
+ }
+
this.fieldDel = fieldDel;
try {
this.comp = (compression == null) ? Compression.none : Compression
- .valueOf(compression.toLowerCase());
+ .valueOf(compression.toLowerCase());
} catch (IllegalArgumentException e) {
System.err.println("Exception when converting compression string: "
- + compression + " to enum. No compression will be used");
+ + compression + " to enum. No compression will be used");
this.comp = Compression.none;
}
}
@@ -127,22 +153,26 @@ public class MultiStorage extends StoreF
//--------------------------------------------------------------------------
// Implementation of StoreFunc
- private RecordWriter<String, Tuple> writer;
+ private RecordWriter<List<String>, Tuple> writer;
@Override
public void putNext(Tuple tuple) throws IOException {
- if (tuple.size() <= splitFieldIndex) {
- throw new IOException("split field index:" + this.splitFieldIndex
- + " >= tuple size:" + tuple.size());
+ for (int splitFieldIndex : this.splitFieldIndices) {
+ if (tuple.size() <= splitFieldIndex) {
+ throw new IOException("split field index:" + splitFieldIndex
+ + " >= tuple size:" + tuple.size());
+ }
}
- Object field = null;
- try {
- field = tuple.get(splitFieldIndex);
- } catch (ExecException exec) {
- throw new IOException(exec);
+ List<String> fields = new ArrayList<String>();
+ for (int splitFieldIndex : this.splitFieldIndices){
+ try {
+ fields.add(String.valueOf(tuple.get(splitFieldIndex)));
+ } catch (ExecException exec) {
+ throw new IOException(exec);
+ }
}
try {
- writer.write(String.valueOf(field), tuple);
+ writer.write(fields, tuple);
} catch (InterruptedException e) {
throw new IOException(e);
}
@@ -153,6 +183,9 @@ public class MultiStorage extends StoreF
public OutputFormat getOutputFormat() throws IOException {
MultiStorageOutputFormat format = new MultiStorageOutputFormat();
format.setKeyValueSeparator(fieldDel);
+ if (this.isRemoveKeys){
+ format.setSkipIndices(this.splitFieldIndices);
+ }
return format;
}
@@ -184,22 +217,23 @@ public class MultiStorage extends StoreF
// Implementation of OutputFormat
public static class MultiStorageOutputFormat extends
- TextOutputFormat<String, Tuple> {
+ TextOutputFormat<List<String>, Tuple> {
private String keyValueSeparator = "\\t";
private byte fieldDel = '\t';
-
+ private List<Integer> skipIndices = null;
+
@Override
- public RecordWriter<String, Tuple>
+ public RecordWriter<List<String>, Tuple>
getRecordWriter(TaskAttemptContext context
) throws IOException, InterruptedException {
final TaskAttemptContext ctx = context;
- return new RecordWriter<String, Tuple>() {
+ return new RecordWriter<List<String>, Tuple>() {
- private Map<String, MyLineRecordWriter> storeMap =
- new HashMap<String, MyLineRecordWriter>();
+ private Map<List<String>, MyLineRecordWriter> storeMap =
+ new HashMap<List<String>, MyLineRecordWriter>();
private static final int BUFFER_SIZE = 1024;
@@ -207,7 +241,7 @@ public class MultiStorage extends StoreF
new ByteArrayOutputStream(BUFFER_SIZE);
@Override
- public void write(String key, Tuple val) throws IOException {
+ public void write(List<String> key, Tuple val) throws IOException {
int sz = val.size();
for (int i = 0; i < sz; i++) {
Object field;
@@ -217,9 +251,13 @@ public class MultiStorage extends StoreF
throw ee;
}
- StorageUtil.putField(mOut, field);
+ boolean skipCurrentField = skipIndices != null &&
skipIndices.contains(i);
+
+ if (!skipCurrentField) {
+ StorageUtil.putField(mOut, field);
+ }
- if (i != sz - 1) {
+ if (i != sz - 1 && !skipCurrentField) {
mOut.write(fieldDel);
}
}
@@ -236,17 +274,17 @@ public class MultiStorage extends StoreF
}
}
- private MyLineRecordWriter getStore(String fieldValue) throws
IOException {
- MyLineRecordWriter store = storeMap.get(fieldValue);
+ private MyLineRecordWriter getStore(List<String> fieldValues) throws
IOException {
+ MyLineRecordWriter store = storeMap.get(fieldValues);
if (store == null) {
- DataOutputStream os = createOutputStream(fieldValue);
+ DataOutputStream os = createOutputStream(fieldValues);
store = new MyLineRecordWriter(os, keyValueSeparator);
- storeMap.put(fieldValue, store);
+ storeMap.put(fieldValues, store);
}
return store;
}
- private DataOutputStream createOutputStream(String fieldValue) throws
IOException {
+ private DataOutputStream createOutputStream(List<String> fieldValues)
throws IOException {
Configuration conf = ctx.getConfiguration();
TaskID taskId = ctx.getTaskAttemptID().getTaskID();
@@ -264,7 +302,21 @@ public class MultiStorage extends StoreF
NumberFormat nf = NumberFormat.getInstance();
nf.setMinimumIntegerDigits(4);
- Path path = new Path(fieldValue+extension, fieldValue + '-'
+ StringBuffer pathStringBuffer = new StringBuffer();
+ for (String fieldValue : fieldValues){
+ String safeFieldValue = fieldValue.replaceAll("\\/","-");
+ pathStringBuffer.append(safeFieldValue);
+ pathStringBuffer.append("/");
+ }
+ pathStringBuffer.deleteCharAt(pathStringBuffer.length()-1);
+ String pathString = pathStringBuffer.toString();
+ String idString = pathString.replaceAll("\\/","-");
+
+ if (!Strings.isNullOrEmpty(extension)){
+ pathString = pathString.replaceAll("\\/",extension+"\\/");
+ }
+
+ Path path = new Path(pathString+extension, idString + '-'
+ nf.format(taskId.getId())+extension);
Path workOutputPath =
((FileOutputCommitter)getOutputCommitter(ctx)).getWorkPath();
Path file = new Path(workOutputPath, path);
@@ -284,8 +336,12 @@ public class MultiStorage extends StoreF
keyValueSeparator = sep;
fieldDel = StorageUtil.parseFieldDel(keyValueSeparator);
}
-
- //------------------------------------------------------------------------
+
+ public void setSkipIndices(List<Integer> skipIndices) {
+ this.skipIndices = skipIndices;
+ }
+
+ //------------------------------------------------------------------------
//
protected static class MyLineRecordWriter
Modified:
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java
URL:
http://svn.apache.org/viewvc/pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java?rev=1772278&r1=1772277&r2=1772278&view=diff
==============================================================================
---
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java
(original)
+++
pig/trunk/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestMultiStorageCompression.java
Thu Dec 1 21:19:02 2016
@@ -21,11 +21,14 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Set;
import junit.framework.TestCase;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.compress.BZip2Codec;
@@ -37,6 +40,10 @@ import org.apache.pig.backend.executione
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.test.Util;
+import com.google.common.collect.Sets;
+
+import org.junit.Assert;
+
public class TestMultiStorageCompression extends TestCase {
private static String patternString = "(\\d+)!+(\\w+)~+(\\w+)";
@@ -59,8 +66,8 @@ public class TestMultiStorageCompression
filesToDelete.add(outputPath);
try {
- runQuery(outputPath, type);
- verifyResults(type, filesToDelete, outputPath);
+ runQuery(outputPath, "0", type);
+ verifyResults(type, outputPath);
} finally {
cleanUpDirs(filesToDelete);
}
@@ -77,22 +84,22 @@ public class TestMultiStorageCompression
filesToDelete.add(outputPath);
try {
- runQuery(outputPath, type);
- verifyResults(type, filesToDelete, outputPath);
+ runQuery(outputPath, "0", type);
+ verifyResults(type, outputPath);
} finally {
cleanUpDirs(filesToDelete);
}
}
- private void cleanUpDirs(List<String> filesToDelete) {
+ private void cleanUpDirs(List<String> filesToDelete) throws IOException {
// Delete files recursively
Collections.reverse(filesToDelete);
for (String string : filesToDelete)
- new File(string).delete();
+ FileUtils.deleteDirectory(new File(string));
}
- private void verifyResults(String type, List<String> filesToDelete,
+ private void verifyResults(String type,
String outputPath) throws IOException, FileNotFoundException {
// Verify the output
File outputDir = new File(outputPath);
@@ -114,12 +121,10 @@ public class TestMultiStorageCompression
continue;
String topFolder = outputPath + File.separator + indexFolder;
File indexFolderFile = new File(topFolder);
- filesToDelete.add(topFolder);
String[] list = indexFolderFile.list();
for (String outputFile : list) {
String file = topFolder + File.separator + outputFile;
- filesToDelete.add(file);
// Skip off any file starting with .
if (outputFile.startsWith("."))
@@ -159,7 +164,7 @@ public class TestMultiStorageCompression
}
}
- private void runQuery(String outputPath, String compressionType)
+ private void runQuery(String outputPath, String keyColIndices, String
compressionType)
throws Exception, ExecException, IOException, FrontendException {
// create a data file
@@ -172,7 +177,7 @@ public class TestMultiStorageCompression
String query2 = "STORE A INTO '" + Util.encodeEscape(outputPath)
+ "' USING org.apache.pig.piggybank.storage.MultiStorage" + "('"
- + Util.encodeEscape(outputPath) + "','0', '" + compressionType +
"', '\\t');";
+ + Util.encodeEscape(outputPath) + "','"+keyColIndices+"', '" +
compressionType + "', '\\t');";
// Run Pig
pig.setBatchOn();
@@ -182,5 +187,32 @@ public class TestMultiStorageCompression
pig.executeBatch();
}
+ public void testMultiStorageShouldSupportMultiLevelAndGz() throws Exception
{
+ String type = "gz";
+ String outputDir = "output001.multi." + type;
+ List<String> filesToDelete = new ArrayList<String>();
+
+ String tmpDir = System.getProperty("java.io.tmpdir");
+ String outputPath = tmpDir + File.separator + outputDir;
+
+ filesToDelete.add(outputPath);
+ try {
+ runQuery(outputPath, "1,0", type);
+ Collection<File> fileList = FileUtils.listFiles(new
File(outputPath),null,true);
+ Set<String> expectedPaths = Sets.newHashSet(
"output001.multi.gz/a.gz/f1.gz/a-f1-0,000.gz",
+
"output001.multi.gz/b.gz/f2.gz/b-f2-0,000.gz",
+
"output001.multi.gz/c.gz/f3.gz/c-f3-0,000.gz",
+
"output001.multi.gz/d.gz/f4.gz/d-f4-0,000.gz");
+ for (File file : fileList){
+ String foundPath =
file.getAbsolutePath().substring(file.getAbsolutePath().indexOf(outputDir));
+ if (expectedPaths.contains(foundPath)){
+ expectedPaths.remove(foundPath);
+ }
+ }
+ Assert.assertTrue(expectedPaths.isEmpty());
+ } finally {
+ cleanUpDirs(filesToDelete);
+ }
+ }
}