Author: gates
Date: Mon Oct 13 21:54:03 2014
New Revision: 1631569
URL: http://svn.apache.org/r1631569
Log:
HIVE-8368 compactor is improperly writing delete records in base file (Alan
Gates, reviewed by Eugene Koifman)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java
Mon Oct 13 21:54:03 2014
@@ -155,6 +155,8 @@ public interface AcidInputFormat<KEY ext
public static interface RawReader<V>
extends RecordReader<RecordIdentifier, V> {
public ObjectInspector getObjectInspector();
+
+ public boolean isDelete(V value);
}
/**
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
Mon Oct 13 21:54:03 2014
@@ -664,6 +664,11 @@ public class OrcRawRecordMerger implemen
(OrcStruct.createObjectInspector(rowType));
}
+ @Override
+ public boolean isDelete(OrcStruct value) {
+ return OrcRecordUpdater.getOperation(value) ==
OrcRecordUpdater.DELETE_OPERATION;
+ }
+
/**
* Get the number of columns in the underlying rows.
* @return 0 if there are no base and no deltas.
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
Mon Oct 13 21:54:03 2014
@@ -506,13 +506,15 @@ public class CompactorMR {
ValidTxnList txnList =
new ValidTxnListImpl(jobConf.get(ValidTxnList.VALID_TXNS_KEY));
+ boolean isMajor = jobConf.getBoolean(IS_MAJOR, false);
AcidInputFormat.RawReader<V> reader =
- aif.getRawReader(jobConf, jobConf.getBoolean(IS_MAJOR, false),
split.getBucket(),
+ aif.getRawReader(jobConf, isMajor, split.getBucket(),
txnList, split.getBaseDir(), split.getDeltaDirs());
RecordIdentifier identifier = reader.createKey();
V value = reader.createValue();
getWriter(reporter, reader.getObjectInspector(), split.getBucket());
while (reader.next(identifier, value)) {
+ if (isMajor && reader.isDelete(value)) continue;
writer.write(value);
reporter.progress();
}
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
Mon Oct 13 21:54:03 2014
@@ -56,6 +56,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
public class TestOrcRawRecordMerger {
@@ -574,12 +575,14 @@ public class TestOrcRawRecordMerger {
OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET, 0, 200), id);
assertEquals("update 1", getValue(event));
+ assertFalse(merger.isDelete(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET, 1, 0), id);
assertEquals("second", getValue(event));
+ assertFalse(merger.isDelete(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.UPDATE_OPERATION,
@@ -616,6 +619,7 @@ public class TestOrcRawRecordMerger {
OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
assertNull(OrcRecordUpdater.getRow(event));
+ assertTrue(merger.isDelete(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
Mon Oct 13 21:54:03 2014
@@ -339,6 +339,7 @@ public abstract class CompactorTest {
private final Configuration conf;
private FSDataInputStream is = null;
private final FileSystem fs;
+ private boolean lastWasDelete = true;
MockRawReader(Configuration conf, List<Path> files) throws IOException {
filesToRead = new Stack<Path>();
@@ -353,6 +354,15 @@ public abstract class CompactorTest {
}
@Override
+ public boolean isDelete(Text value) {
+ // Alternate between returning deleted and not. This is easier than
actually
+ // tracking operations. We test that this is getting properly called by
checking that only
+ // half the records show up in base files after major compactions.
+ lastWasDelete = !lastWasDelete;
+ return lastWasDelete;
+ }
+
+ @Override
public boolean next(RecordIdentifier identifier, Text text) throws
IOException {
if (is == null) {
// Open the next file
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java?rev=1631569&r1=1631568&r2=1631569&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java
Mon Oct 13 21:54:03 2014
@@ -418,8 +418,8 @@ public class TestWorker extends Compacto
Assert.assertEquals(2, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(1248L, buckets[0].getLen());
- Assert.assertEquals(1248L, buckets[1].getLen());
+ Assert.assertEquals(624L, buckets[0].getLen());
+ Assert.assertEquals(624L, buckets[1].getLen());
} else {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());
}
@@ -464,8 +464,8 @@ public class TestWorker extends Compacto
Assert.assertEquals(2, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(1248L, buckets[0].getLen());
- Assert.assertEquals(1248L, buckets[1].getLen());
+ Assert.assertEquals(624L, buckets[0].getLen());
+ Assert.assertEquals(624L, buckets[1].getLen());
} else {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());
}
@@ -507,8 +507,8 @@ public class TestWorker extends Compacto
Assert.assertEquals(2, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(208L, buckets[0].getLen());
- Assert.assertEquals(208L, buckets[1].getLen());
+ Assert.assertEquals(104L, buckets[0].getLen());
+ Assert.assertEquals(104L, buckets[1].getLen());
} else {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());
}
@@ -551,8 +551,8 @@ public class TestWorker extends Compacto
Assert.assertEquals(2, buckets.length);
Assert.assertTrue(buckets[0].getPath().getName().matches("bucket_0000[01]"));
Assert.assertTrue(buckets[1].getPath().getName().matches("bucket_0000[01]"));
- Assert.assertEquals(1248L, buckets[0].getLen());
- Assert.assertEquals(1248L, buckets[1].getLen());
+ Assert.assertEquals(624L, buckets[0].getLen());
+ Assert.assertEquals(624L, buckets[1].getLen());
} else {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());
}
@@ -606,9 +606,10 @@ public class TestWorker extends Compacto
Table t = newTable("default", "mapwbmb", true);
Partition p = newPartition(t, "today");
+
addBaseFile(t, p, 20L, 20, 2, false);
addDeltaFile(t, p, 21L, 22L, 2, 2, false);
- addDeltaFile(t, p, 23L, 24L, 2);
+ addDeltaFile(t, p, 23L, 26L, 4);
burnThroughTransactions(25);
@@ -631,7 +632,7 @@ public class TestWorker extends Compacto
// Find the new delta file and make sure it has the right contents
boolean sawNewBase = false;
for (int i = 0; i < stat.length; i++) {
- if (stat[i].getPath().getName().equals("base_0000024")) {
+ if (stat[i].getPath().getName().equals("base_0000026")) {
sawNewBase = true;
FileStatus[] buckets = fs.listStatus(stat[i].getPath());
Assert.assertEquals(2, buckets.length);
@@ -640,10 +641,12 @@ public class TestWorker extends Compacto
// Bucket 0 should be small and bucket 1 should be large, make sure
that's the case
Assert.assertTrue(
("bucket_00000".equals(buckets[0].getPath().getName()) && 104L ==
buckets[0].getLen()
- && "bucket_00001".equals(buckets[1].getPath().getName()) && 1248L
== buckets[1] .getLen())
+ && "bucket_00001".equals(buckets[1].getPath().getName()) && 676L
== buckets[1]
+ .getLen())
||
("bucket_00000".equals(buckets[1].getPath().getName()) && 104L ==
buckets[1].getLen()
- && "bucket_00001".equals(buckets[0].getPath().getName()) && 1248L
== buckets[0] .getLen())
+ && "bucket_00001".equals(buckets[0].getPath().getName()) && 676L
== buckets[0]
+ .getLen())
);
} else {
LOG.debug("This is not the file you are looking for " +
stat[i].getPath().getName());