gates
Fri, 30 Oct 2009 14:45:17 -0700
Author: gates Date: Fri Oct 30 21:44:51 2009 New Revision: 831462 URL: http://svn.apache.org/viewvc?rev=831462&view=rev Log: PIG-1057 Zebra does not support concurrent deletions of column groups now.
Modified:
hadoop/pig/trunk/contrib/zebra/CHANGES.txt
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=831462&r1=831461&r2=831462&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Fri Oct 30 21:44:51 2009
@@ -18,6 +18,9 @@
BUG FIXES
+ PIG-1057 Zebra does not support concurrent deletions of column groups
now
+ (chaow via gates).
+
PIG-944 Change schema to be taken from StoreConfig instead of
TableStorer's constructor (yanz via gates).
Modified:
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=831462&r1=831461&r2=831462&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
Fri Oct 30 21:44:51 2009
@@ -21,6 +21,7 @@
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
@@ -127,8 +128,25 @@
throws IOException {
FileSystem fs = FileSystem.get(conf);
+ int triedCount = 0;
+ int numCGs = SchemaFile.getNumCGs(path, conf);
+ SchemaFile schemaFile = null;
- SchemaFile schemaFile = new SchemaFile(path, conf);
+ /* Retry up to numCGs times accounting for other CG deleting threads or
processes.*/
+ while (triedCount ++ < numCGs) {
+ try {
+ schemaFile = new SchemaFile(path, conf);
+ break;
+ } catch (FileNotFoundException e) {
+ LOG.info("Try " + triedCount + " times : " + e.getMessage());
+ } catch (Exception e) {
+ throw new IOException ("Cannot construct SchemaFile : " +
e.getMessage());
+ }
+ }
+
+ if (schemaFile == null) {
+ throw new IOException ("Cannot construct SchemaFile");
+ }
int cgIdx = schemaFile.getCGByName(cgName);
if (cgIdx < 0) {
@@ -137,9 +155,8 @@
}
Path cgPath = new Path(path, schemaFile.getName(cgIdx));
-
- //Clean up any previous unfinished attempts to drop column groups?
-
+
+ //Clean up any previous unfinished attempts to drop column groups?
if (schemaFile.isCGDeleted(cgIdx)) {
// Clean up unfinished delete if it exists. so that clean up can
// complete if the previous deletion was interrupted for some reason.
@@ -1480,9 +1497,9 @@
return cgschemas[nx].getCompressor();
}
- /**
- * Returns the index for CG with the given name.
- * -1 indicates that there is no CG with the name.
+ /**
+ * Returns the index for CG with the given name. -1 indicates that there is
+ * no CG with the name.
*/
int getCGByName(String cgName) {
for(int i=0; i<physical.length; i++) {
@@ -1592,6 +1609,32 @@
in.close();
}
+ private static int getNumCGs(Path path, Configuration conf) throws
IOException {
+ Path pathSchema = makeSchemaFilePath(path);
+ if (!path.getFileSystem(conf).exists(pathSchema)) {
+ throw new IOException("BT Schema file doesn't exist: " + pathSchema);
+ }
+ // read schema file
+ FSDataInputStream in = path.getFileSystem(conf).open(pathSchema);
+ Version version = new Version(in);
+ // verify compatibility against SCHEMA_VERSION
+ if (!version.compatibleWith(SCHEMA_VERSION)) {
+ new IOException("Incompatible versions, expecting: " + SCHEMA_VERSION
+ + "; found in file: " + version);
+ }
+
+ // read comparator
+ WritableUtils.readString(in);
+ // read logicalStr
+ WritableUtils.readString(in);
+ // read storage
+ WritableUtils.readString(in);
+ int numCGs = WritableUtils.readVInt(in);
+ in.close();
+
+ return numCGs;
+ }
+
private static Path makeSchemaFilePath(Path parent) {
return new Path(parent, BT_SCHEMA_FILE);
}
@@ -1607,23 +1650,24 @@
for (FileStatus file : path.getFileSystem(conf).listStatus(path)) {
if (!file.isDir()) {
- String fname = file.getPath().getName();
- if (fname.startsWith(DELETED_CG_PREFIX)) {
- deletedCGs.add(fname.substring(DELETED_CG_PREFIX.length()));
- }
+ String fname = file.getPath().getName();
+ if (fname.startsWith(DELETED_CG_PREFIX)) {
+ deletedCGs.add(fname.substring(DELETED_CG_PREFIX.length()));
+ }
}
}
for(int i=0; i<physical.length; i++) {
- cgDeletedFlags[i] =
- deletedCGs.contains(getName(i));
+ cgDeletedFlags[i] = deletedCGs.contains(getName(i));
}
}
+
+
}
static public void dumpInfo(String file, PrintStream out, Configuration conf)
throws IOException {
- dumpInfo(file, out, conf, 0);
+ dumpInfo(file, out, conf, 0);
}
static public void dumpInfo(String file, PrintStream out, Configuration
conf, int indent)
Modified:
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
URL:
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java?rev=831462&r1=831461&r2=831462&view=diff
==============================================================================
---
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
(original)
+++
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
Fri Oct 30 21:44:51 2009
@@ -27,6 +27,7 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.zebra.parser.ParseException;
@@ -48,6 +49,7 @@
Log LOG = LogFactory.getLog(TestDropColumnGroup.class);
private static Path path;
private static Configuration conf;
+ private static FileSystem fs;
@BeforeClass
public static void setUpOnce() throws IOException {
@@ -55,7 +57,7 @@
path = new Path(TestBasicTable.rootPath, "DropCGTest");
conf = TestBasicTable.conf;
Log LOG = LogFactory.getLog(TestDropColumnGroup.class);
-
+ fs = path.getFileSystem(conf);
}
@AfterClass
@@ -114,8 +116,10 @@
* columns can be read the value returned is null.
*/
- BasicTable.drop(path, conf);
-
+ if (fs.exists(path)) {
+ BasicTable.drop(path, conf);
+ }
+
int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
"[a, b]; [c, d]", path, true, false);
@@ -195,12 +199,13 @@
+ "[c] as collectionCG; "
+ "[r1.f2, m1#{b}, m2#{z}] as mapRecordCG; ";
- Path path = new Path(TestBasicTable.rootPath, "DropCGTest");
+ //Path path = new Path(TestBasicTable.rootPath, "DropCGTest");
Configuration conf = TestBasicTable.conf;
conf.set("fs.default.name", "file:///");
-
- BasicTable.drop(path, conf);
-
+ if (fs.exists(path)) {
+ BasicTable.drop(path, conf);
+ }
+
// first write the table :
BasicTable.Writer writer = new BasicTable.Writer(path, mixedSchema,
mixedStorageHint, false, conf);
@@ -394,25 +399,35 @@
/*
* Tests concurrent drop CGs
*/
+ if (fs.exists(path)) {
+ BasicTable.drop(path, conf);
+ }
- BasicTable.drop(path, conf);
-
- int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
- "[a];[b];[c];[d];[e];[f]", path, true, false);
- System.out.println("Frist dump:");
+ int numRows = TestBasicTable.createBasicTable(1, 10,
"f1,f2,f3,f4,f5,f6,f7,f8,f9,f10," +
+
"f11,f12,f13,f14,f15,f16,f17,f18,f19,f20," +
+
"f21,f22,f23,f24,f25,f26,f27,f28,f29,f30," +
+
"f31,f32,f33,f34,f35,f36,f37,f38,f39,f40," +
+
"f41,f42,f43,f44,f45,f46,f47,f48,f49,f50",
+ "[f1];[f2];[f3];[f4];[f5];[f6];[f7];[f8];[f9];[f10];" +
+ "[f11];[f12];[f13];[f14];[f15];[f16];[f17];[f18];[f19];[f20];" +
+ "[f21];[f22];[f23];[f24];[f25];[f26];[f27];[f28];[f29];[f30];" +
+ "[f31];[f32];[f33];[f34];[f35];[f36];[f37];[f38];[f39];[f40];" +
+ "[f41];[f42];[f43];[f44];[f45];[f46];[f47];[f48];[f49];[f50]",
+ path, true, false);
+
+ System.out.println("First dump:");
BasicTable.dumpInfo(path.toString(), System.out, conf);
int rowsToRead = Math.min(10, numRows);
// normal table.
- verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+ verifyScanner(path, conf, "f1, f3, xx", new boolean[] { false, false, true
},
rowsToRead);
// create a thread for each dropCG
- DropThread[] threads = new DropThread[6];
+ DropThread[] threads = new DropThread[50];
for (int i = 0; i < threads.length; i++) {
-
- threads[i] = new DropThread(i);
+ threads[i] = new DropThread(i, 50);
}
// start the threads
@@ -430,17 +445,16 @@
// check various read cases.
- verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, true,
+ verifyScanner(path, conf, "f3, f1, f2, f6, f4, f5", new boolean[] { true,
true,
true, true, true, true }, rowsToRead);
System.out.println("second dump");
BasicTable.dumpInfo(path.toString(), System.out, conf);
// Now make sure the reader reports zero rows.
- Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+ Assert.assertTrue(countRows(path, conf, "f3, f5, f2") == 0);
// delete the table
BasicTable.drop(path, conf);
-
}
@Test
@@ -448,25 +462,35 @@
/*
* Tests concurrrent drop CGs while one fails
*/
+ if (fs.exists(path)) {
+ BasicTable.drop(path, conf);
+ }
- BasicTable.drop(path, conf);
-
- int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
- "[a];[b];[c];[d];[e];[f]", path, true, false);
- System.out.println("Frist dump:");
+ int numRows = TestBasicTable.createBasicTable(1, 10,
"f1,f2,f3,f4,f5,f6,f7,f8,f9,f10," +
+ "f11,f12,f13,f14,f15,f16,f17,f18,f19,f20," +
+ "f21,f22,f23,f24,f25,f26,f27,f28,f29,f30," +
+ "f31,f32,f33,f34,f35,f36,f37,f38,f39,f40," +
+ "f41,f42,f43,f44,f45,f46,f47,f48,f49,f50",
+ "[f1];[f2];[f3];[f4];[f5];[f6];[f7];[f8];[f9];[f10];" +
+ "[f11];[f12];[f13];[f14];[f15];[f16];[f17];[f18];[f19];[f20];" +
+ "[f21];[f22];[f23];[f24];[f25];[f26];[f27];[f28];[f29];[f30];" +
+ "[f31];[f32];[f33];[f34];[f35];[f36];[f37];[f38];[f39];[f40];" +
+ "[f41];[f42];[f43];[f44];[f45];[f46];[f47];[f48];[f49];[f50]",
+ path, true, false);
+
+ System.out.println("First dump:");
BasicTable.dumpInfo(path.toString(), System.out, conf);
int rowsToRead = Math.min(10, numRows);
// normal table.
- verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+ verifyScanner(path, conf, "f1, f3, xx", new boolean[] { false, false, true
},
rowsToRead);
// create a thread for each dropCG
- DropThread[] threads = new DropThread[7];
+ DropThread[] threads = new DropThread[60];
for (int i = 0; i < threads.length; i++) {
-
- threads[i] = new DropThread(i);
+ threads[i] = new DropThread(i, 50);
}
// start the threads
@@ -478,18 +502,19 @@
try {
thr.join();
} catch (InterruptedException e) {
+ e.printStackTrace();
}
}
// check various read cases.
- verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, true,
+ verifyScanner(path, conf, "f3, f1, f2, f6, f4, f5", new boolean[] { true,
true,
true, true, true, true }, rowsToRead);
System.out.println("second dump");
BasicTable.dumpInfo(path.toString(), System.out, conf);
// Now make sure the reader reports zero rows.
- Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+ Assert.assertTrue(countRows(path, conf, "f3, f5, f2") == 0);
// delete the table
BasicTable.drop(path, conf);
@@ -502,10 +527,13 @@
*/
System.out.println("######int test 5");
- BasicTable.drop(path, conf);
+
+ if (fs.exists(path)) {
+ BasicTable.drop(path, conf);
+ }
int numRows = TestBasicTable.createBasicTable(1, 100000,
- "a, b, c, d, e, f", "[a, b]; [c, d]", path, true, false);
+ "a, b, c, d, e, f, g, h, i, j, k, l, m, n", "[a, b]; [c, d]; [e]; [f];
[g]; [h]; [i]; [j]; [k]; [l]; [m]; [n]", path, true, false);
System.out.println("in test5 , dump infor 1");
BasicTable.dumpInfo(path.toString(), System.out, conf);
@@ -519,23 +547,21 @@
rowsToRead);
// create a thread for each dropCG
- DropThread[] dropThreads = new DropThread[3];
+ DropThread[] dropThreads = new DropThread[12];
for (int i = 0; i < dropThreads.length; i++) {
-
- dropThreads[i] = new DropThread(i);
+ dropThreads[i] = new DropThread(i, 12);
}
// start the threads
for (int j = 0; j < dropThreads.length; j++) {
dropThreads[j].start();
}
-
+
// create read threads
ReadThread[] readThreads = new ReadThread[numOfReadThreads];
for (int i = 0; i < readThreads.length; i++) {
-
readThreads[i] = new ReadThread(i, "a, b, c, d, e, f", 1000);
}
@@ -551,6 +577,7 @@
e.printStackTrace();
}
}
+
for (Thread thr : readThreads) {
try {
thr.join();
@@ -569,7 +596,6 @@
// delete the table
BasicTable.drop(path, conf);
-
}
@Test
@@ -593,9 +619,10 @@
/*
* Tests API, path is wrong
*/
-
- BasicTable.drop(path, conf);
-
+ if (fs.exists(path)) {
+ BasicTable.drop(path, conf);
+ }
+
TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
"[a];[b];[c];[d];[e];[f]", path, true, false);
Path wrongPath = new Path(path.toString() + "non-existing");
@@ -730,17 +757,17 @@
BasicTable.drop(path1, conf);
}
- /**
+/**
* A thread that performs a DropColumnGroup.
*/
class DropThread extends Thread {
private int id;
+ private int cntCGs;
- public DropThread(int id) {
-
+ public DropThread(int id, int cntCGs) {
this.id = id;
-
+ this.cntCGs = cntCGs;
}
/**
@@ -748,13 +775,22 @@
*/
public void run() {
try {
- System.out.println("Droping CG: " + id);
- BasicTable.dropColumnGroup(path, conf, "CG" + id);
+ int total = cntCGs;
+ int digits = 1;
+ while (total >= 10) {
+ ++ digits;
+ total /= 10;
+ }
+ String formatString = "%0" + digits + "d";
+ String str = "CG" + String.format(formatString, id);
+
+ System.out.println(id + ": Droping CG: " + str);
+ BasicTable.dropColumnGroup(path, conf, str);
} catch (Exception e) {
System.out.println(id + " - error: " + e);
+ e.printStackTrace();
}
}
-
}
/**