Author: gates
Date: Thu Sep 4 15:29:56 2014
New Revision: 1622499
URL: http://svn.apache.org/r1622499
Log:
HIVE-7571 RecordUpdater should read virtual columns from row (Alan Gates,
reviewed by Owen O'Malley)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java?rev=1622499&r1=1622498&r2=1622499&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java
Thu Sep 4 15:29:56 2014
@@ -52,6 +52,7 @@ public interface AcidOutputFormat<K exte
private int bucket;
private PrintStream dummyStream = null;
private boolean oldStyle = false;
+ private int recIdCol = -1; // Column the record identifier is in, -1
indicates no record id
/**
* Create the options object.
@@ -164,6 +165,16 @@ public interface AcidOutputFormat<K exte
}
/**
+ * Which column the row id field is in.
+ * @param recIdCol
+ * @return this
+ */
+ public Options recordIdColumn(int recIdCol) {
+ this.recIdCol = recIdCol;
+ return this;
+ }
+
+ /**
* Temporary switch while we are in development that replaces the
* implementation with a dummy one that just prints to stream.
* @param stream the stream to print to
@@ -214,6 +225,10 @@ public interface AcidOutputFormat<K exte
return bucket;
}
+ public int getRecordIdColumn() {
+ return recIdCol;
+ }
+
public PrintStream getDummyStream() {
return dummyStream;
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java?rev=1622499&r1=1622498&r2=1622499&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java
(original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/RecordUpdater.java Thu
Sep 4 15:29:56 2014
@@ -40,26 +40,17 @@ public interface RecordUpdater {
/**
* Update an old record with a new set of values.
* @param currentTransaction the current transaction id
- * @param originalTransaction the row's original transaction id
- * @param rowId the original row id
* @param row the new values for the row
* @throws IOException
*/
- void update(long currentTransaction,
- long originalTransaction,
- long rowId,
- Object row) throws IOException;
+ void update(long currentTransaction, Object row) throws IOException;
/**
* Delete a row from the table.
* @param currentTransaction the current transaction id
- * @param originalTransaction the rows original transaction id
- * @param rowId the original row id
* @throws IOException
*/
- void delete(long currentTransaction,
- long originalTransaction,
- long rowId) throws IOException;
+ void delete(long currentTransaction, Object row) throws IOException;
/**
* Flush the current set of rows to the underlying file system, so that
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java?rev=1622499&r1=1622498&r2=1622499&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcOutputFormat.java
Thu Sep 4 15:29:56 2014
@@ -211,18 +211,14 @@ public class OrcOutputFormat extends Fil
}
@Override
- public void update(long currentTransaction, long originalTransaction,
- long rowId, Object row) throws IOException {
+ public void update(long currentTransaction, Object row) throws IOException
{
out.println("update " + path + " currTxn: " + currentTransaction +
- " origTxn: " + originalTransaction + " row: " + rowId + " obj: " +
- stringifyObject(row, inspector));
+ " obj: " + stringifyObject(row, inspector));
}
@Override
- public void delete(long currentTransaction, long originalTransaction,
- long rowId) throws IOException {
- out.println("delete " + path + " currTxn: " + currentTransaction +
- " origTxn: " + originalTransaction + " row: " + rowId);
+ public void delete(long currentTransaction, Object row) throws IOException
{
+ out.println("delete " + path + " currTxn: " + currentTransaction + "
obj: " + row);
}
@Override
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java?rev=1622499&r1=1622498&r2=1622499&view=diff
==============================================================================
---
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
(original)
+++
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java
Thu Sep 4 15:29:56 2014
@@ -31,18 +31,18 @@ import org.apache.hadoop.hive.ql.io.Reco
import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import
org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
import
org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
-import java.nio.charset.CharsetEncoder;
import java.util.ArrayList;
import java.util.List;
@@ -92,6 +92,14 @@ public class OrcRecordUpdater implements
// because that is monotonically increasing to give new unique row ids.
private long rowCountDelta = 0;
private final KeyIndexBuilder indexBuilder = new KeyIndexBuilder();
+ private StructField recIdField = null; // field to look for the record
identifier in
+ private StructField rowIdField = null; // field inside recId to look for row
id in
+ private StructField originalTxnField = null; // field inside recId to look
for original txn in
+ private StructObjectInspector rowInspector; // OI for the original row
+ private StructObjectInspector recIdInspector; // OI for the record
identifier struct
+ private LongObjectInspector rowIdInspector; // OI for the long row id inside
the recordIdentifier
+ private LongObjectInspector origTxnInspector; // OI for the original txn
inside the record
+ // identifer
static class AcidStats {
long inserts;
@@ -179,7 +187,7 @@ public class OrcRecordUpdater implements
* @param rowInspector the row's object inspector
* @return an object inspector for the event stream
*/
- static ObjectInspector createEventSchema(ObjectInspector rowInspector) {
+ static StructObjectInspector createEventSchema(ObjectInspector rowInspector)
{
List<StructField> fields = new ArrayList<StructField>();
fields.add(new OrcStruct.Field("operation",
PrimitiveObjectInspectorFactory.writableIntObjectInspector,
OPERATION));
@@ -237,7 +245,9 @@ public class OrcRecordUpdater implements
writerOptions.bufferSize(DELTA_BUFFER_SIZE);
writerOptions.stripeSize(DELTA_STRIPE_SIZE);
}
- writerOptions.inspector(createEventSchema(options.getInspector()));
+ rowInspector = (StructObjectInspector)options.getInspector();
+ writerOptions.inspector(createEventSchema(findRecId(options.getInspector(),
+ options.getRecordIdColumn())));
this.writer = OrcFile.createWriter(this.path, writerOptions);
item = new OrcStruct(FIELDS);
item.setFieldValue(OPERATION, operation);
@@ -247,14 +257,50 @@ public class OrcRecordUpdater implements
item.setFieldValue(ROW_ID, rowId);
}
- private void addEvent(int operation, long currentTransaction,
- long originalTransaction, long rowId,
- Object row) throws IOException {
+ // Find the record identifier column (if there) and return a possibly new
ObjectInspector that
+ // will strain out the record id for the underlying writer.
+ private ObjectInspector findRecId(ObjectInspector inspector, int
rowIdColNum) {
+ if (!(inspector instanceof StructObjectInspector)) {
+ throw new RuntimeException("Serious problem, expected a
StructObjectInspector, but got a " +
+ inspector.getClass().getName());
+ }
+ if (rowIdColNum < 0) {
+ return inspector;
+ } else {
+ RecIdStrippingObjectInspector newInspector =
+ new RecIdStrippingObjectInspector(inspector, rowIdColNum);
+ recIdField = newInspector.getRecId();
+ List<? extends StructField> fields =
+ ((StructObjectInspector)
recIdField.getFieldObjectInspector()).getAllStructFieldRefs();
+ // Go by position, not field name, as field names aren't guaranteed.
The order of fields
+ // in RecordIdentifier is transactionId, bucketId, rowId
+ originalTxnField = fields.get(0);
+ origTxnInspector =
(LongObjectInspector)originalTxnField.getFieldObjectInspector();
+ rowIdField = fields.get(2);
+ rowIdInspector =
(LongObjectInspector)rowIdField.getFieldObjectInspector();
+
+
+ recIdInspector = (StructObjectInspector)
recIdField.getFieldObjectInspector();
+ return newInspector;
+ }
+ }
+
+ private void addEvent(int operation, long currentTransaction, long rowId,
Object row)
+ throws IOException {
this.operation.set(operation);
this.currentTransaction.set(currentTransaction);
- this.originalTransaction.set(originalTransaction);
+ // If this is an insert, originalTransaction should be set to this
transaction. If not,
+ // it will be reset by the following if anyway.
+ long originalTransaction = currentTransaction;
+ if (operation == DELETE_OPERATION || operation == UPDATE_OPERATION) {
+ Object rowIdValue = rowInspector.getStructFieldData(row, recIdField);
+ originalTransaction = origTxnInspector.get(
+ recIdInspector.getStructFieldData(rowIdValue, originalTxnField));
+ rowId = rowIdInspector.get(recIdInspector.getStructFieldData(rowIdValue,
rowIdField));
+ }
this.rowId.set(rowId);
- item.setFieldValue(OrcRecordUpdater.ROW, row);
+ this.originalTransaction.set(originalTransaction);
+ item.setFieldValue(OrcRecordUpdater.ROW, (operation == DELETE_OPERATION ?
null : row));
indexBuilder.addKey(operation, originalTransaction, bucket.get(), rowId);
writer.addRow(item);
}
@@ -264,30 +310,26 @@ public class OrcRecordUpdater implements
if (this.currentTransaction.get() != currentTransaction) {
insertedRows = 0;
}
- addEvent(INSERT_OPERATION, currentTransaction, currentTransaction,
- insertedRows++, row);
+ addEvent(INSERT_OPERATION, currentTransaction, insertedRows++, row);
rowCountDelta++;
}
@Override
- public void update(long currentTransaction, long originalTransaction,
- long rowId, Object row) throws IOException {
+ public void update(long currentTransaction, Object row) throws IOException {
if (this.currentTransaction.get() != currentTransaction) {
insertedRows = 0;
}
- addEvent(UPDATE_OPERATION, currentTransaction, originalTransaction, rowId,
- row);
+ addEvent(UPDATE_OPERATION, currentTransaction, -1L, row);
}
@Override
- public void delete(long currentTransaction, long originalTransaction,
- long rowId) throws IOException {
+ public void delete(long currentTransaction, Object row) throws IOException {
if (this.currentTransaction.get() != currentTransaction) {
insertedRows = 0;
}
- addEvent(DELETE_OPERATION, currentTransaction, originalTransaction, rowId,
- null);
+ addEvent(DELETE_OPERATION, currentTransaction, -1, row);
rowCountDelta--;
+
}
@Override
@@ -311,7 +353,7 @@ public class OrcRecordUpdater implements
fs.delete(path, false);
}
} else {
- writer.close();
+ if (writer != null) writer.close();
}
if (flushLengths != null) {
flushLengths.close();
@@ -406,4 +448,67 @@ public class OrcRecordUpdater implements
lastRowId = rowId;
}
}
+
+ /**
+ * An ObjectInspector that will strip out the record identifier so that the
underlying writer
+ * doesn't see it.
+ */
+ private static class RecIdStrippingObjectInspector extends
StructObjectInspector {
+ private StructObjectInspector wrapped;
+ List<StructField> fields;
+ StructField recId;
+
+ RecIdStrippingObjectInspector(ObjectInspector oi, int rowIdColNum) {
+ if (!(oi instanceof StructObjectInspector)) {
+ throw new RuntimeException("Serious problem, expected a
StructObjectInspector, " +
+ "but got a " + oi.getClass().getName());
+ }
+ wrapped = (StructObjectInspector)oi;
+ List<? extends StructField> wrappedFields =
wrapped.getAllStructFieldRefs();
+ fields = new
ArrayList<StructField>(wrapped.getAllStructFieldRefs().size());
+ for (int i = 0; i < wrappedFields.size(); i++) {
+ if (i == rowIdColNum) {
+ recId = wrappedFields.get(i);
+ } else {
+ fields.add(wrappedFields.get(i));
+ }
+ }
+ }
+
+ @Override
+ public List<? extends StructField> getAllStructFieldRefs() {
+ return fields;
+ }
+
+ @Override
+ public StructField getStructFieldRef(String fieldName) {
+ return wrapped.getStructFieldRef(fieldName);
+ }
+
+ @Override
+ public Object getStructFieldData(Object data, StructField fieldRef) {
+ // For performance don't check that that the fieldRef isn't recId
everytime,
+ // just assume that the caller used getAllStructFieldRefs and thus
doesn't have that fieldRef
+ return wrapped.getStructFieldData(data, fieldRef);
+ }
+
+ @Override
+ public List<Object> getStructFieldsDataAsList(Object data) {
+ return wrapped.getStructFieldsDataAsList(data);
+ }
+
+ @Override
+ public String getTypeName() {
+ return wrapped.getTypeName();
+ }
+
+ @Override
+ public Category getCategory() {
+ return wrapped.getCategory();
+ }
+
+ StructField getRecId() {
+ return recId;
+ }
+ }
}
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java?rev=1622499&r1=1622498&r2=1622499&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
Thu Sep 4 15:29:56 2014
@@ -56,6 +56,7 @@ import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNull;
public class TestOrcRawRecordMerger {
@@ -454,9 +455,16 @@ public class TestOrcRawRecordMerger {
static class MyRow {
Text col1;
+ RecordIdentifier ROW__ID;
+
MyRow(String val) {
col1 = new Text(val);
}
+
+ MyRow(String val, long rowId, long origTxn, int bucket) {
+ col1 = new Text(val);
+ ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
+ }
}
static String getValue(OrcStruct event) {
@@ -533,12 +541,12 @@ public class TestOrcRawRecordMerger {
// write a delta
ru = of.getRecordUpdater(root, options.writingBase(false)
- .minimumTransactionId(200).maximumTransactionId(200));
- ru.update(200, 0, 0, new MyRow("update 1"));
- ru.update(200, 0, 2, new MyRow("update 2"));
- ru.update(200, 0, 3, new MyRow("update 3"));
- ru.delete(200, 0, 7);
- ru.delete(200, 0, 8);
+
.minimumTransactionId(200).maximumTransactionId(200).recordIdColumn(1));
+ ru.update(200, new MyRow("update 1", 0, 0, BUCKET));
+ ru.update(200, new MyRow("update 2", 2, 0, BUCKET));
+ ru.update(200, new MyRow("update 3", 3, 0, BUCKET));
+ ru.delete(200, new MyRow("", 7, 0, BUCKET));
+ ru.delete(200, new MyRow("", 8, 0, BUCKET));
ru.close(false);
ValidTxnList txnList = new ValidTxnListImpl("200:");
@@ -607,13 +615,13 @@ public class TestOrcRawRecordMerger {
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
- assertEquals(null, OrcRecordUpdater.getRow(event));
+ assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
- assertEquals(null, OrcRecordUpdater.getRow(event));
+ assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
@@ -693,7 +701,7 @@ public class TestOrcRawRecordMerger {
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET, 7, 200), id);
- assertEquals(null, OrcRecordUpdater.getRow(event));
+ assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
@@ -705,8 +713,7 @@ public class TestOrcRawRecordMerger {
assertEquals(OrcRecordUpdater.DELETE_OPERATION,
OrcRecordUpdater.getOperation(event));
assertEquals(new ReaderKey(0, BUCKET, 8, 200), id);
- assertEquals(null, OrcRecordUpdater.getRow(event));
-
+ assertNull(OrcRecordUpdater.getRow(event));
assertEquals(true, merger.next(id, event));
assertEquals(OrcRecordUpdater.INSERT_OPERATION,
OrcRecordUpdater.getOperation(event));
@@ -747,6 +754,7 @@ public class TestOrcRawRecordMerger {
Text mytext;
float myfloat;
double mydouble;
+ RecordIdentifier ROW__ID;
BigRow(int myint, long mylong, String mytext, float myfloat, double
mydouble) {
this.myint = myint;
@@ -754,6 +762,21 @@ public class TestOrcRawRecordMerger {
this.mytext = new Text(mytext);
this.myfloat = myfloat;
this.mydouble = mydouble;
+ ROW__ID = null;
+ }
+
+ BigRow(int myint, long mylong, String mytext, float myfloat, double
mydouble,
+ long rowId, long origTxn, int bucket) {
+ this.myint = myint;
+ this.mylong = mylong;
+ this.mytext = new Text(mytext);
+ this.myfloat = myfloat;
+ this.mydouble = mydouble;
+ ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
+ }
+
+ BigRow(long rowId, long origTxn, int bucket) {
+ ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
}
}
@@ -802,16 +825,16 @@ public class TestOrcRawRecordMerger {
// write a delta
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
.writingBase(false).minimumTransactionId(1).maximumTransactionId(1)
- .bucket(BUCKET).inspector(inspector).filesystem(fs);
+ .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5);
RecordUpdater ru = of.getRecordUpdater(root, options);
values = new String[]{"0.0", null, null, "1.1", null, null, null,
"ignore.7"};
for(int i=0; i < values.length; ++i) {
if (values[i] != null) {
- ru.update(1, 0, i, new BigRow(i, i, values[i], i, i));
+ ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
}
}
- ru.delete(100, 0, 9);
+ ru.delete(100, new BigRow(9, 0, BUCKET));
ru.close(false);
// write a delta
@@ -820,10 +843,10 @@ public class TestOrcRawRecordMerger {
values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
for(int i=0; i < values.length; ++i) {
if (values[i] != null) {
- ru.update(2, 0, i, new BigRow(i, i, values[i], i, i));
+ ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
}
}
- ru.delete(100, 0, 8);
+ ru.delete(100, new BigRow(8, 0, BUCKET));
ru.close(false);
InputFormat inf = new OrcInputFormat();
@@ -902,16 +925,16 @@ public class TestOrcRawRecordMerger {
ru.close(false);
// write a delta
- options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1);
+
options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1).recordIdColumn(5);
ru = of.getRecordUpdater(root, options);
values = new String[]{"0.0", null, null, "1.1", null, null, null,
"ignore.7"};
for(int i=0; i < values.length; ++i) {
if (values[i] != null) {
- ru.update(1, 0, i, new BigRow(i, i, values[i], i, i));
+ ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
}
}
- ru.delete(100, 0, 9);
+ ru.delete(100, new BigRow(9, 0, BUCKET));
ru.close(false);
// write a delta
@@ -920,10 +943,10 @@ public class TestOrcRawRecordMerger {
values = new String[]{null, null, "1.0", null, null, null, null, "3.1"};
for(int i=0; i < values.length; ++i) {
if (values[i] != null) {
- ru.update(2, 0, i, new BigRow(i, i, values[i], i, i));
+ ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET));
}
}
- ru.delete(100, 0, 8);
+ ru.delete(100, new BigRow(8, 0, BUCKET));
ru.close(false);
InputFormat inf = new OrcInputFormat();
Modified:
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
URL:
http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java?rev=1622499&r1=1622498&r2=1622499&view=diff
==============================================================================
---
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
(original)
+++
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRecordUpdater.java
Thu Sep 4 15:29:56 2014
@@ -23,8 +23,8 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.RecordIdentifier;
import org.apache.hadoop.hive.ql.io.RecordUpdater;
-import org.apache.hadoop.hive.serde2.SerDeStats;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.io.IntWritable;
@@ -37,6 +37,7 @@ import java.io.DataInputStream;
import java.io.File;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
public class TestOrcRecordUpdater {
@@ -64,9 +65,18 @@ public class TestOrcRecordUpdater {
static class MyRow {
Text field;
+ RecordIdentifier ROW__ID;
+
MyRow(String val) {
field = new Text(val);
+ ROW__ID = null;
+ }
+
+ MyRow(String val, long rowId, long origTxn, int bucket) {
+ field = new Text(val);
+ ROW__ID = new RecordIdentifier(origTxn, bucket, rowId);
}
+
}
@Test
@@ -178,17 +188,19 @@ public class TestOrcRecordUpdater {
inspector = ObjectInspectorFactory.getReflectionObjectInspector
(MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA);
}
+ int bucket = 20;
AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf)
.filesystem(fs)
- .bucket(20)
+ .bucket(bucket)
.writingBase(false)
.minimumTransactionId(100)
.maximumTransactionId(100)
.inspector(inspector)
- .reporter(Reporter.NULL);
+ .reporter(Reporter.NULL)
+ .recordIdColumn(1);
RecordUpdater updater = new OrcRecordUpdater(root, options);
- updater.update(100, 10, 30, new MyRow("update"));
- updater.delete(100, 40, 60);
+ updater.update(100, new MyRow("update", 30, 10, bucket));
+ updater.delete(100, new MyRow("", 60, 40, bucket));
assertEquals(-1L, updater.getStats().getRowCount());
updater.close(false);
Path bucketPath = AcidUtils.createFilename(root, options);
@@ -216,7 +228,7 @@ public class TestOrcRecordUpdater {
assertEquals(40, OrcRecordUpdater.getOriginalTransaction(row));
assertEquals(20, OrcRecordUpdater.getBucket(row));
assertEquals(60, OrcRecordUpdater.getRowId(row));
- assertEquals(null, OrcRecordUpdater.getRow(row));
+ assertNull(OrcRecordUpdater.getRow(row));
assertEquals(false, rows.hasNext());
}
}