Author: gates
Date: Mon Oct 13 22:00:36 2014
New Revision: 1631574

URL: http://svn.apache.org/r1631574
Log:
HIVE-8368 compactor is improperly writing delete records in base file (Alan 
Gates reviewed by Eugene Koifman)

Modified:
    
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
    
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
    
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
    
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
    
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
    
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java

Modified: 
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java?rev=1631574&r1=1631573&r2=1631574&view=diff
==============================================================================
--- 
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
 (original)
+++ 
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
 Mon Oct 13 22:00:36 2014
@@ -155,6 +155,8 @@ public interface AcidInputFormat<KEY ext
   public static interface RawReader<V>
       extends RecordReader<RecordIdentifier, V> {
     public ObjectInspector getObjectInspector();
+
+    public boolean isDelete(V value);
   }
 
   /**

Modified: 
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1631574&r1=1631573&r2=1631574&view=diff
==============================================================================
--- 
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
 (original)
+++ 
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
 Mon Oct 13 22:00:36 2014
@@ -664,6 +664,11 @@ public class OrcRawRecordMerger implemen
         (OrcStruct.createObjectInspector(rowType));
   }
 
+  @Override
+  public boolean isDelete(OrcStruct value) {
+    return OrcRecordUpdater.getOperation(value) == 
OrcRecordUpdater.DELETE_OPERATION;
+  }
+
   /**
    * Get the number of columns in the underlying rows.
    * @return 0 if there are no base and no deltas.

Modified: 
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1631574&r1=1631573&r2=1631574&view=diff
==============================================================================
--- 
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
 (original)
+++ 
hive/branches/branch-0.14/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
 Mon Oct 13 22:00:36 2014
@@ -506,13 +506,15 @@ public class CompactorMR {
       ValidTxnList txnList =
           new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
 
+      boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
       AcidInputFormat.RawReader<V> reader =
-          aif.getRawReader(jobConf, jobConf.getBoolean(IS_MAJOR, false), 
split.getBucket(),
+          aif.getRawReader(jobConf, isMajor, split.getBucket(),
               txnList, split.getBaseDir(), split.getDeltaDirs());
       RecordIdentifier identifier = reader.createKey();
       V value = reader.createValue();
       getWriter(reporter, reader.getObjectInspector(), split.getBucket());
       while (reader.next(identifier, value)) {
+        if (isMajor && reader.isDelete(value)) continue;
         writer.write(value);
         reporter.progress();
       }

Modified: 
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java?rev=1631574&r1=1631573&r2=1631574&view=diff
==============================================================================
--- 
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
 (original)
+++ 
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
 Mon Oct 13 22:00:36 2014
@@ -56,6 +56,7 @@ import java.util.List;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 
 public class TestOrcRawRecordMerger {
@@ -574,12 +575,14 @@ public class TestOrcRawRecordMerger {
         OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
     assertEquals("update 1", getValue(event));
+    assertFalse(merger.isDelete(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.INSERT_OPERATION,
         OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
     assertEquals("second", getValue(event));
+    assertFalse(merger.isDelete(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
@@ -616,6 +619,7 @@ public class TestOrcRawRecordMerger {
         OrcRecordUpdater.getOperation(event));
     assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
     assertNull(OrcRecordUpdater.getRow(event));
+    assertTrue(merger.isDelete(event));
 
     assertEquals(true, merger.next(id, event));
     assertEquals(OrcRecordUpdater.DELETE_OPERATION,

Modified: 
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java?rev=1631574&r1=1631573&r2=1631574&view=diff
==============================================================================
--- 
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
 (original)
+++ 
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
 Mon Oct 13 22:00:36 2014
@@ -339,6 +339,7 @@ public abstract class CompactorTest {
     private final Configuration conf;
     private FSDataInputStream is = null;
     private final FileSystem fs;
+    private boolean lastWasDelete = true;
 
     MockRawReader(Configuration conf, List<Path> files) throws IOException {
       filesToRead = new Stack<Path>();
@@ -353,6 +354,15 @@ public abstract class CompactorTest {
     }
 
     @Override
+    public boolean isDelete(Text value) {
+      // Alternate between returning deleted and not.  This is easier than 
actually
+      // tracking operations. We test that this is getting properly called by 
checking that only
+      // half the records show up in base files after major compactions.
+      lastWasDelete = !lastWasDelete;
+      return lastWasDelete;
+    }
+
+    @Override
     public boolean next(RecordIdentifier identifier, Text text) throws 
IOException {
       if (is == null) {
         // Open the next file

Modified: 
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
URL: 
http://svn.apache.org/viewvc/hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java?rev=1631574&r1=1631573&r2=1631574&view=diff
==============================================================================
--- 
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
 (original)
+++ 
hive/branches/branch-0.14/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
 Mon Oct 13 22:00:36 2014
@@ -418,8 +418,8 @@ public class TestWorker extends Compacto
         Assert.assertEquals(2, buckets.length);
         
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
         
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(1248L, buckets[0].getLen());
-        Assert.assertEquals(1248L, buckets[1].getLen());
+        Assert.assertEquals(624L, buckets[0].getLen());
+        Assert.assertEquals(624L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
@@ -464,8 +464,8 @@ public class TestWorker extends Compacto
         Assert.assertEquals(2, buckets.length);
         
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
         
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(1248L, buckets[0].getLen());
-        Assert.assertEquals(1248L, buckets[1].getLen());
+        Assert.assertEquals(624L, buckets[0].getLen());
+        Assert.assertEquals(624L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
@@ -507,8 +507,8 @@ public class TestWorker extends Compacto
         Assert.assertEquals(2, buckets.length);
         
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
         
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(208L, buckets[0].getLen());
-        Assert.assertEquals(208L, buckets[1].getLen());
+        Assert.assertEquals(104L, buckets[0].getLen());
+        Assert.assertEquals(104L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
@@ -551,8 +551,8 @@ public class TestWorker extends Compacto
         Assert.assertEquals(2, buckets.length);
         
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
         
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
-        Assert.assertEquals(1248L, buckets[0].getLen());
-        Assert.assertEquals(1248L, buckets[1].getLen());
+        Assert.assertEquals(624L, buckets[0].getLen());
+        Assert.assertEquals(624L, buckets[1].getLen());
       } else {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());
       }
@@ -606,9 +606,10 @@ public class TestWorker extends Compacto
     Table t = newTable("default", "mapwbmb", true);
     Partition p = newPartition(t, "today");
 
+
     addBaseFile(t, p, 20L, 20, 2, false);
     addDeltaFile(t, p, 21L, 22L, 2, 2, false);
-    addDeltaFile(t, p, 23L, 24L, 2);
+    addDeltaFile(t, p, 23L, 26L, 4);
 
     burnThroughTransactions(25);
 
@@ -631,7 +632,7 @@ public class TestWorker extends Compacto
     // Find the new delta file and make sure it has the right contents
     boolean sawNewBase = false;
     for (int i = 0; i < stat.length; i++) {
-      if (stat[i].getPath().getName().equals("base_0000024")) {
+      if (stat[i].getPath().getName().equals("base_0000026")) {
         sawNewBase = true;
         FileStatus[] buckets = fs.listStatus(stat[i].getPath());
         Assert.assertEquals(2, buckets.length);
@@ -640,10 +641,12 @@ public class TestWorker extends Compacto
         // Bucket 0 should be small and bucket 1 should be large, make sure 
that's the case
         Assert.assertTrue(
             ("bucket_00000".equals(buckets[0].getPath().getName()) && 104L == 
buckets[0].getLen()
-            && "bucket_00001".equals(buckets[1].getPath().getName()) && 1248L 
== buckets[1] .getLen())
+            && "bucket_00001".equals(buckets[1].getPath().getName()) && 676L 
== buckets[1]
+                .getLen())
             ||
             ("bucket_00000".equals(buckets[1].getPath().getName()) && 104L == 
buckets[1].getLen()
-            && "bucket_00001".equals(buckets[0].getPath().getName()) && 1248L 
== buckets[0] .getLen())
+            && "bucket_00001".equals(buckets[0].getPath().getName()) && 676L 
== buckets[0]
+                .getLen())
         );
       } else {
         LOG.debug("This is not the file you are looking for " + 
stat[i].getPath().getName());


Reply via email to