pig-commits  

svn commit: r831462 - in /hadoop/pig/trunk/contrib/zebra: CHANGES.txt src/java/org/apache/hadoop/zebra/io/BasicTable.java src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java

gates
Fri, 30 Oct 2009 14:45:17 -0700

Author: gates
Date: Fri Oct 30 21:44:51 2009
New Revision: 831462

URL: http://svn.apache.org/viewvc?rev=831462&view=rev
Log:
PIG-1057 Zebra does not support concurrent deletions of column groups now.

Modified:
    hadoop/pig/trunk/contrib/zebra/CHANGES.txt
    
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
    
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java

Modified: hadoop/pig/trunk/contrib/zebra/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/CHANGES.txt?rev=831462&r1=831461&r2=831462&view=diff
==============================================================================
--- hadoop/pig/trunk/contrib/zebra/CHANGES.txt (original)
+++ hadoop/pig/trunk/contrib/zebra/CHANGES.txt Fri Oct 30 21:44:51 2009
@@ -18,6 +18,9 @@
 
   BUG FIXES
 
+       PIG-1057 Zebra does not support concurrent deletions of column groups 
now
+       (chaow via gates).
+
        PIG-944  Change schema to be taken from StoreConfig instead of
        TableStorer's constructor (yanz via gates).
 

Modified: 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java?rev=831462&r1=831461&r2=831462&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/java/org/apache/hadoop/zebra/io/BasicTable.java
 Fri Oct 30 21:44:51 2009
@@ -21,6 +21,7 @@
 import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.PrintStream;
@@ -127,8 +128,25 @@
                                      throws IOException {
     
     FileSystem fs = FileSystem.get(conf);
+    int triedCount = 0;
+    int numCGs =  SchemaFile.getNumCGs(path, conf);
+    SchemaFile schemaFile = null;
     
-    SchemaFile schemaFile = new SchemaFile(path, conf);
+    /* Retry up to numCGs times accounting for other CG deleting threads or 
processes.*/
+    while (triedCount ++ < numCGs) {
+      try {
+        schemaFile = new SchemaFile(path, conf);
+        break;
+      } catch (FileNotFoundException e) {
+        LOG.info("Try " + triedCount + " times : " + e.getMessage());
+      } catch (Exception e) {
+        throw new IOException ("Cannot construct SchemaFile : " + 
e.getMessage());
+      }
+    }
+    
+    if (schemaFile == null) {
+      throw new IOException ("Cannot construct SchemaFile");
+    }
     
     int cgIdx = schemaFile.getCGByName(cgName);
     if (cgIdx < 0) {
@@ -137,9 +155,8 @@
     }
     
     Path cgPath = new Path(path, schemaFile.getName(cgIdx));
-    
-    //Clean up any previous unfinished attempts to drop column groups?
-    
+        
+    //Clean up any previous unfinished attempts to drop column groups?    
     if (schemaFile.isCGDeleted(cgIdx)) {
       // Clean up unfinished delete if it exists. so that clean up can 
       // complete if the previous deletion was interrupted for some reason.
@@ -1480,9 +1497,9 @@
       return cgschemas[nx].getCompressor();
     }
 
-    /** 
-     * Returns the index for CG with the given name.
-     * -1 indicates that there is no CG with the name.
+    /**
+     * Returns the index for CG with the given name. -1 indicates that there is
+     * no CG with the name.
      */
     int getCGByName(String cgName) {
       for(int i=0; i<physical.length; i++) {
@@ -1592,6 +1609,32 @@
       in.close();
     }
 
+    private static int getNumCGs(Path path, Configuration conf) throws 
IOException {
+      Path pathSchema = makeSchemaFilePath(path);
+      if (!path.getFileSystem(conf).exists(pathSchema)) {
+        throw new IOException("BT Schema file doesn't exist: " + pathSchema);
+      }
+      // read schema file
+      FSDataInputStream in = path.getFileSystem(conf).open(pathSchema);
+      Version version = new Version(in);
+      // verify compatibility against SCHEMA_VERSION
+      if (!version.compatibleWith(SCHEMA_VERSION)) {
+        new IOException("Incompatible versions, expecting: " + SCHEMA_VERSION
+            + "; found in file: " + version);
+      }
+      
+      // read comparator
+      WritableUtils.readString(in);
+      // read logicalStr
+      WritableUtils.readString(in);
+      // read storage
+      WritableUtils.readString(in);
+      int numCGs = WritableUtils.readVInt(in);
+      in.close();
+
+      return numCGs;
+    }
+
     private static Path makeSchemaFilePath(Path parent) {
       return new Path(parent, BT_SCHEMA_FILE);
     }
@@ -1607,23 +1650,24 @@
       
       for (FileStatus file : path.getFileSystem(conf).listStatus(path)) {
         if (!file.isDir()) {
-           String fname =  file.getPath().getName();
-           if (fname.startsWith(DELETED_CG_PREFIX)) {
-             deletedCGs.add(fname.substring(DELETED_CG_PREFIX.length()));
-           }
+          String fname =  file.getPath().getName();
+          if (fname.startsWith(DELETED_CG_PREFIX)) {
+            deletedCGs.add(fname.substring(DELETED_CG_PREFIX.length()));
+          }
         }
       }
       
       for(int i=0; i<physical.length; i++) {
-        cgDeletedFlags[i] = 
-          deletedCGs.contains(getName(i));
+        cgDeletedFlags[i] = deletedCGs.contains(getName(i));
       }
     }
+    
+    
   }
 
   static public void dumpInfo(String file, PrintStream out, Configuration conf)
       throws IOException {
-      dumpInfo(file, out, conf, 0);
+    dumpInfo(file, out, conf, 0);
   }
 
   static public void dumpInfo(String file, PrintStream out, Configuration 
conf, int indent)

Modified: 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
URL: 
http://svn.apache.org/viewvc/hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java?rev=831462&r1=831461&r2=831462&view=diff
==============================================================================
--- 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
 (original)
+++ 
hadoop/pig/trunk/contrib/zebra/src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java
 Fri Oct 30 21:44:51 2009
@@ -27,6 +27,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.zebra.parser.ParseException;
 
@@ -48,6 +49,7 @@
   Log LOG = LogFactory.getLog(TestDropColumnGroup.class);
   private static Path path;
   private static Configuration conf;
+  private static FileSystem fs;
 
   @BeforeClass
   public static void setUpOnce() throws IOException {
@@ -55,7 +57,7 @@
     path = new Path(TestBasicTable.rootPath, "DropCGTest");
     conf = TestBasicTable.conf;
     Log LOG = LogFactory.getLog(TestDropColumnGroup.class);
-
+    fs = path.getFileSystem(conf);
   }
 
   @AfterClass
@@ -114,8 +116,10 @@
      * columns can be read the value returned is null.
      */
 
-    BasicTable.drop(path, conf);
-
+    if (fs.exists(path)) {
+      BasicTable.drop(path, conf);
+    }
+    
     int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
         "[a, b]; [c, d]", path, true, false);
 
@@ -195,12 +199,13 @@
         + "[c]                           as collectionCG; "  
         + "[r1.f2, m1#{b}, m2#{z}]       as mapRecordCG; ";
 
-    Path path = new Path(TestBasicTable.rootPath, "DropCGTest");
+    //Path path = new Path(TestBasicTable.rootPath, "DropCGTest");
     Configuration conf = TestBasicTable.conf;
     conf.set("fs.default.name", "file:///");
-
-    BasicTable.drop(path, conf);
-
+    if (fs.exists(path)) {
+      BasicTable.drop(path, conf);
+    }
+    
     // first write the table :
     BasicTable.Writer writer = new BasicTable.Writer(path, mixedSchema,
         mixedStorageHint, false, conf);
@@ -394,25 +399,35 @@
     /*
      * Tests concurrent drop CGs
      */
+    if (fs.exists(path)) {
+      BasicTable.drop(path, conf);
+    }
 
-    BasicTable.drop(path, conf);
-
-    int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
-        "[a];[b];[c];[d];[e];[f]", path, true, false);
-    System.out.println("Frist dump:");
+    int numRows = TestBasicTable.createBasicTable(1, 10, 
"f1,f2,f3,f4,f5,f6,f7,f8,f9,f10," +
+                                                                
"f11,f12,f13,f14,f15,f16,f17,f18,f19,f20," +
+                                                                
"f21,f22,f23,f24,f25,f26,f27,f28,f29,f30," +
+                                                                
"f31,f32,f33,f34,f35,f36,f37,f38,f39,f40," +
+                                                                
"f41,f42,f43,f44,f45,f46,f47,f48,f49,f50",
+      "[f1];[f2];[f3];[f4];[f5];[f6];[f7];[f8];[f9];[f10];" +
+      "[f11];[f12];[f13];[f14];[f15];[f16];[f17];[f18];[f19];[f20];" +
+      "[f21];[f22];[f23];[f24];[f25];[f26];[f27];[f28];[f29];[f30];" +
+      "[f31];[f32];[f33];[f34];[f35];[f36];[f37];[f38];[f39];[f40];" +
+      "[f41];[f42];[f43];[f44];[f45];[f46];[f47];[f48];[f49];[f50]",
+      path, true, false);
+    
+    System.out.println("First dump:");
     BasicTable.dumpInfo(path.toString(), System.out, conf);
     int rowsToRead = Math.min(10, numRows);
 
     // normal table.
-    verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+    verifyScanner(path, conf, "f1, f3, xx", new boolean[] { false, false, true 
},
         rowsToRead);
 
     // create a thread for each dropCG
-    DropThread[] threads = new DropThread[6];
+    DropThread[] threads = new DropThread[50];
 
     for (int i = 0; i < threads.length; i++) {
-
-      threads[i] = new DropThread(i);
+      threads[i] = new DropThread(i, 50);
     }
 
     // start the threads
@@ -430,17 +445,16 @@
 
     // check various read cases.
 
-    verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, true,
+    verifyScanner(path, conf, "f3, f1, f2, f6, f4, f5", new boolean[] { true, 
true,
         true, true, true, true }, rowsToRead);
     System.out.println("second dump");
     BasicTable.dumpInfo(path.toString(), System.out, conf);
 
     // Now make sure the reader reports zero rows.
-    Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+    Assert.assertTrue(countRows(path, conf, "f3, f5, f2") == 0);
 
     // delete the table
     BasicTable.drop(path, conf);
-
   }
 
   @Test
@@ -448,25 +462,35 @@
     /*
      * Tests concurrrent drop CGs while one fails
      */
+    if (fs.exists(path)) {
+      BasicTable.drop(path, conf);
+    }
 
-    BasicTable.drop(path, conf);
-
-    int numRows = TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
-        "[a];[b];[c];[d];[e];[f]", path, true, false);
-    System.out.println("Frist dump:");
+    int numRows = TestBasicTable.createBasicTable(1, 10, 
"f1,f2,f3,f4,f5,f6,f7,f8,f9,f10," +
+        "f11,f12,f13,f14,f15,f16,f17,f18,f19,f20," +
+        "f21,f22,f23,f24,f25,f26,f27,f28,f29,f30," +
+        "f31,f32,f33,f34,f35,f36,f37,f38,f39,f40," +
+        "f41,f42,f43,f44,f45,f46,f47,f48,f49,f50",
+        "[f1];[f2];[f3];[f4];[f5];[f6];[f7];[f8];[f9];[f10];" +
+        "[f11];[f12];[f13];[f14];[f15];[f16];[f17];[f18];[f19];[f20];" +
+        "[f21];[f22];[f23];[f24];[f25];[f26];[f27];[f28];[f29];[f30];" +
+        "[f31];[f32];[f33];[f34];[f35];[f36];[f37];[f38];[f39];[f40];" +
+        "[f41];[f42];[f43];[f44];[f45];[f46];[f47];[f48];[f49];[f50]",
+        path, true, false);
+    
+    System.out.println("First dump:");
     BasicTable.dumpInfo(path.toString(), System.out, conf);
     int rowsToRead = Math.min(10, numRows);
 
     // normal table.
-    verifyScanner(path, conf, "a, c, x", new boolean[] { false, false, true },
+    verifyScanner(path, conf, "f1, f3, xx", new boolean[] { false, false, true 
},
         rowsToRead);
 
     // create a thread for each dropCG
-    DropThread[] threads = new DropThread[7];
+    DropThread[] threads = new DropThread[60];
 
     for (int i = 0; i < threads.length; i++) {
-
-      threads[i] = new DropThread(i);
+      threads[i] = new DropThread(i, 50);
     }
 
     // start the threads
@@ -478,18 +502,19 @@
       try {
         thr.join();
       } catch (InterruptedException e) {
+        e.printStackTrace();
       }
     }
 
     // check various read cases.
 
-    verifyScanner(path, conf, "c, a, b, f, d, e", new boolean[] { true, true,
+    verifyScanner(path, conf, "f3, f1, f2, f6, f4, f5", new boolean[] { true, 
true,
         true, true, true, true }, rowsToRead);
     System.out.println("second dump");
     BasicTable.dumpInfo(path.toString(), System.out, conf);
 
     // Now make sure the reader reports zero rows.
-    Assert.assertTrue(countRows(path, conf, "c, e, b") == 0);
+    Assert.assertTrue(countRows(path, conf, "f3, f5, f2") == 0);
 
     // delete the table
     BasicTable.drop(path, conf);
@@ -502,10 +527,13 @@
      */
 
     System.out.println("######int test 5");
-    BasicTable.drop(path, conf);
+
+    if (fs.exists(path)) {
+      BasicTable.drop(path, conf);
+    }
 
     int numRows = TestBasicTable.createBasicTable(1, 100000,
-        "a, b, c, d, e, f", "[a, b]; [c, d]", path, true, false);
+        "a, b, c, d, e, f, g, h, i, j, k, l, m, n", "[a, b]; [c, d]; [e]; [f]; 
[g]; [h]; [i]; [j]; [k]; [l]; [m]; [n]", path, true, false);
 
     System.out.println("in test5 , dump infor 1");
     BasicTable.dumpInfo(path.toString(), System.out, conf);
@@ -519,23 +547,21 @@
         rowsToRead);
 
     // create a thread for each dropCG
-    DropThread[] dropThreads = new DropThread[3];
+    DropThread[] dropThreads = new DropThread[12];
 
     for (int i = 0; i < dropThreads.length; i++) {
-
-      dropThreads[i] = new DropThread(i);
+      dropThreads[i] = new DropThread(i, 12);
     }
 
     // start the threads
     for (int j = 0; j < dropThreads.length; j++) {
       dropThreads[j].start();
     }
-
+    
     // create read threads
     ReadThread[] readThreads = new ReadThread[numOfReadThreads];
 
     for (int i = 0; i < readThreads.length; i++) {
-
       readThreads[i] = new ReadThread(i, "a, b, c, d, e, f", 1000);
     }
 
@@ -551,6 +577,7 @@
         e.printStackTrace();
       }
     }
+    
     for (Thread thr : readThreads) {
       try {
         thr.join();
@@ -569,7 +596,6 @@
 
     // delete the table
     BasicTable.drop(path, conf);
-
   }
 
   @Test
@@ -593,9 +619,10 @@
     /*
      * Tests API, path is wrong
      */
-
-    BasicTable.drop(path, conf);
-
+    if (fs.exists(path)) {
+      BasicTable.drop(path, conf);
+    }
+    
     TestBasicTable.createBasicTable(1, 10, "a, b, c, d, e, f",
         "[a];[b];[c];[d];[e];[f]", path, true, false);
     Path wrongPath = new Path(path.toString() + "non-existing");
@@ -730,17 +757,17 @@
     BasicTable.drop(path1, conf);
   }
 
-  /**
+/**
    * A thread that performs a DropColumnGroup.
    */
   class DropThread extends Thread {
 
     private int id;
+    private int cntCGs;
 
-    public DropThread(int id) {
-
+    public DropThread(int id, int cntCGs) {
       this.id = id;
-
+      this.cntCGs = cntCGs;
     }
 
     /**
@@ -748,13 +775,22 @@
      */
     public void run() {
       try {
-        System.out.println("Droping CG: " + id);
-        BasicTable.dropColumnGroup(path, conf, "CG" + id);
+        int total = cntCGs;
+        int digits = 1;
+        while (total >= 10) {
+          ++ digits;
+          total /= 10;
+        }
+        String formatString = "%0" + digits + "d";
+        String str = "CG"  + String.format(formatString, id);
+
+        System.out.println(id + ": Droping CG: " + str);
+        BasicTable.dropColumnGroup(path, conf, str);
       } catch (Exception e) {
         System.out.println(id + " - error: " + e);
+        e.printStackTrace();
       }
     }
-
   }
 
   /**


  • svn commit: r831462 - in /hadoop/pig/trunk/contrib/zebra: CHANGES.txt src/java/org/apache/hadoop/zebra/io/BasicTable.java src/test/org/apache/hadoop/zebra/io/TestDropColumnGroup.java gates