This is an automated email from the ASF dual-hosted git repository.
klcopp pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 414a2f491e HIVE-26147: OrcRawRecordMerger throws NPE when
hive.acid.key.index is missing for an acid file (Alessandro Solimando, reviewed
by Aman Sinha and Karen Coppage)
414a2f491e is described below
commit 414a2f491ea7f7491c113e8e410b017bac36be5b
Author: Alessandro Solimando <[email protected]>
AuthorDate: Tue Apr 19 17:07:06 2022 +0200
HIVE-26147: OrcRawRecordMerger throws NPE when hive.acid.key.index is
missing for an acid file (Alessandro Solimando, reviewed by Aman Sinha and
Karen Coppage)
Closes #3219.
---
.../hadoop/hive/ql/io/orc/OrcRawRecordMerger.java | 17 +-
.../hive/ql/io/orc/TestOrcRawRecordMerger.java | 842 ++++++++++-----------
2 files changed, 426 insertions(+), 433 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
index db64704270..509fd40e3e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java
@@ -763,15 +763,20 @@ public class OrcRawRecordMerger implements
AcidInputFormat.RawReader<OrcStruct>{
}
/**
- * Find the key range for the split (of the base). These are used to filter
delta files since
- * both are sorted by key.
+ * Find the key range for the split (of the base) based on the
'hive.acid.key.index' metadata.
+ * These keys are used to filter delta files since both are sorted by key.
+ * If 'hive.acid.key.index' is missing from the ORC file, return null keys
(which forces a full read).
* @param reader the reader
* @param options the options for reading with
- * @throws IOException
*/
- private KeyInterval discoverKeyBounds(Reader reader,
- Reader.Options options) throws IOException {
- RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader);
+ private KeyInterval discoverKeyBounds(Reader reader, Reader.Options options)
{
+ final RecordIdentifier[] keyIndex = OrcRecordUpdater.parseKeyIndex(reader);
+ if (keyIndex == null) {
+ LOG.warn("Missing '{}' metadata in ORC file, can't compute min/max keys",
+ OrcRecordUpdater.ACID_KEY_INDEX_NAME);
+ return new KeyInterval(null, null);
+ }
+
long offset = options.getOffset();
long maxOffset = options.getMaxOffset();
int firstStripe = 0;
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
index 76a96e4514..681943285f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java
@@ -69,14 +69,18 @@ import com.google.common.collect.Lists;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@@ -84,16 +88,16 @@ public class TestOrcRawRecordMerger {
private static final Logger LOG =
LoggerFactory.getLogger(TestOrcRawRecordMerger.class);
@Test
- public void testOrdering() throws Exception {
+ public void testOrdering() {
ReaderKey left = new ReaderKey(100, 200, 1200, 300);
ReaderKey right = new ReaderKey();
right.setValues(100, 200, 1000, 200, false);
assertTrue(right.compareTo(left) < 0);
assertTrue(left.compareTo(right) > 0);
- assertEquals(false, left.equals(right));
+ assertNotEquals(left, right);
left.set(right);
- assertTrue(right.compareTo(left) == 0);
- assertEquals(true, right.equals(left));
+ assertEquals(0, right.compareTo(left));
+ assertEquals(right, left);
right.setRowId(2000);
assertTrue(right.compareTo(left) > 0);
left.setValues(1, 2, 3, 4, false);
@@ -113,8 +117,8 @@ public class TestOrcRawRecordMerger {
RecordIdentifier ri = new RecordIdentifier(1, 2, 3);
assertEquals(1, ri.compareTo(left));
assertEquals(-1, left.compareTo(ri));
- assertEquals(false, ri.equals(left));
- assertEquals(false, left.equals(ri));
+ assertNotEquals(ri, left);
+ assertNotEquals(left, ri);
}
@Test
@@ -189,7 +193,7 @@ public class TestOrcRawRecordMerger {
private List<StripeInformation> createStripes(long... rowCounts) {
long offset = 0;
List<StripeInformation> result =
- new ArrayList<StripeInformation>(rowCounts.length);
+ new ArrayList<>(rowCounts.length);
int stripeCount = 0;
for(long count: rowCounts) {
OrcProto.StripeInformation.Builder stripe =
@@ -237,6 +241,37 @@ public class TestOrcRawRecordMerger {
return reader;
}
+ private static <T> void checkReaderRecord(
+ int writeId, int bucketProperty, int rowId, int currWriteId, T value,
ReaderKey key, ReaderPair pair) {
+ assertEquals(writeId, key.getWriteId());
+ assertEquals(bucketProperty, key.getBucketProperty());
+ assertEquals(rowId, key.getRowId());
+ assertEquals(currWriteId, key.getCurrentWriteId());
+ assertEquals(value, value(pair.nextRecord()));
+ }
+
+ private static <T> void checkMergerRecord(
+ long writeId, int bucketProperty, long rowId, T value, RecordIdentifier
key, OrcStruct event) {
+ assertEquals(value, getValue(event));
+ assertEquals(writeId, key.getWriteId());
+ assertEquals(bucketProperty, key.getBucketProperty());
+ assertEquals(rowId, key.getRowId());
+ }
+
+ private static <T> void checkMergerRecord(
+ int operationType, T value, ReaderKey key, RecordIdentifier id,
OrcStruct event) {
+ checkEvent(operationType, event);
+ checkMergerRecord(key.getWriteId(), key.getBucketProperty(),
key.getRowId(),
+ value, id, event);
+ }
+
+ private static void checkEvent(int operationType, OrcStruct event) {
+ assertEquals(operationType, OrcRecordUpdater.getOperation(event));
+ if (operationType == OrcRecordUpdater.DELETE_OPERATION) {
+ assertNull(OrcRecordUpdater.getRow(event));
+ }
+ }
+
@Test
public void testReaderPair() throws Exception {
ReaderKey key = new ReaderKey();
@@ -246,68 +281,41 @@ public class TestOrcRawRecordMerger {
ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader,
minKey, maxKey,
new Reader.Options(), new HiveConf());
RecordReader recordReader = pair.getRecordReader();
- assertEquals(10, key.getWriteId());
- assertEquals(20, key.getBucketProperty());
- assertEquals(40, key.getRowId());
- assertEquals(120, key.getCurrentWriteId());
- assertEquals("third", value(pair.nextRecord()));
+ checkReaderRecord(10, 20, 40, 120, "third", key, pair);
pair.next(pair.nextRecord());
- assertEquals(40, key.getWriteId());
- assertEquals(50, key.getBucketProperty());
- assertEquals(60, key.getRowId());
- assertEquals(130, key.getCurrentWriteId());
- assertEquals("fourth", value(pair.nextRecord()));
+ checkReaderRecord(40, 50, 60, 130, "fourth", key, pair);
pair.next(pair.nextRecord());
- assertEquals(null, pair.nextRecord());
+ assertNull(pair.nextRecord());
Mockito.verify(recordReader).close();
}
@Test
- public void testReaderPairNoMin() throws Exception {
+ public void testReaderPairNoMinMaxKeys() throws Exception {
ReaderKey key = new ReaderKey();
Reader reader = createMockReader();
+ // null min and max keys forces a full scan of all records
ReaderPair pair = new OrcRawRecordMerger.ReaderPairAcid(key, reader, null,
null,
new Reader.Options(), new HiveConf());
RecordReader recordReader = pair.getRecordReader();
- assertEquals(10, key.getWriteId());
- assertEquals(20, key.getBucketProperty());
- assertEquals(20, key.getRowId());
- assertEquals(100, key.getCurrentWriteId());
- assertEquals("first", value(pair.nextRecord()));
+ checkReaderRecord(10, 20, 20, 100, "first", key, pair);
pair.next(pair.nextRecord());
- assertEquals(10, key.getWriteId());
- assertEquals(20, key.getBucketProperty());
- assertEquals(30, key.getRowId());
- assertEquals(110, key.getCurrentWriteId());
- assertEquals("second", value(pair.nextRecord()));
+ checkReaderRecord(10, 20, 30, 110, "second", key, pair);
pair.next(pair.nextRecord());
- assertEquals(10, key.getWriteId());
- assertEquals(20, key.getBucketProperty());
- assertEquals(40, key.getRowId());
- assertEquals(120, key.getCurrentWriteId());
- assertEquals("third", value(pair.nextRecord()));
+ checkReaderRecord(10, 20, 40, 120, "third", key, pair);
pair.next(pair.nextRecord());
- assertEquals(40, key.getWriteId());
- assertEquals(50, key.getBucketProperty());
- assertEquals(60, key.getRowId());
- assertEquals(130, key.getCurrentWriteId());
- assertEquals("fourth", value(pair.nextRecord()));
+ checkReaderRecord(40, 50, 60, 130, "fourth", key, pair);
pair.next(pair.nextRecord());
- assertEquals(40, key.getWriteId());
- assertEquals(50, key.getBucketProperty());
- assertEquals(61, key.getRowId());
- assertEquals(140, key.getCurrentWriteId());
- assertEquals("fifth", value(pair.nextRecord()));
+ checkReaderRecord(40, 50, 61, 140, "fifth", key, pair);
pair.next(pair.nextRecord());
- assertEquals(null, pair.nextRecord());
+ assertNull(pair.nextRecord());
Mockito.verify(recordReader).close();
}
@@ -357,22 +365,16 @@ public class TestOrcRawRecordMerger {
ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key,
reader, BUCKET, minKey, maxKey,
new Reader.Options().include(includes), new
OrcRawRecordMerger.Options().rootPath(root),
conf, new ValidReaderWriteIdList(), 0);
+
+ // min/max keys drives the record reading, skipping to the 3rd row and
stopping at the 4th
RecordReader recordReader = pair.getRecordReader();
- assertEquals(0, key.getWriteId());
- assertEquals(bucketProperty, key.getBucketProperty());
- assertEquals(2, key.getRowId());
- assertEquals(0, key.getCurrentWriteId());
- assertEquals("third", value(pair.nextRecord()));
+ checkReaderRecord(0, bucketProperty, 2, 0, "third", key, pair);
pair.next(pair.nextRecord());
- assertEquals(0, key.getWriteId());
- assertEquals(bucketProperty, key.getBucketProperty());
- assertEquals(3, key.getRowId());
- assertEquals(0, key.getCurrentWriteId());
- assertEquals("fourth", value(pair.nextRecord()));
+ checkReaderRecord(0, bucketProperty, 3, 0, "fourth", key, pair);
pair.next(pair.nextRecord());
- assertEquals(null, pair.nextRecord());
+ assertNull(pair.nextRecord());
Mockito.verify(recordReader).close();
}
@@ -381,93 +383,46 @@ public class TestOrcRawRecordMerger {
}
@Test
- public void testOriginalReaderPairNoMin() throws Exception {
+ public void testOriginalReaderPairNoMinMaxKeys() throws Exception {
int BUCKET = 10;
ReaderKey key = new ReaderKey();
Reader reader = createMockOriginalReader();
Configuration conf = new Configuration();
int bucketProperty = OrcRawRecordMerger.encodeBucketId(conf, BUCKET, 0);
FileSystem fs = FileSystem.getLocal(conf);
- Path root = new Path(tmpDir, "testOriginalReaderPairNoMin");
+ Path root = new Path(tmpDir, "testOriginalReaderPairNoMinMaxKeys");
fs.makeQualified(root);
fs.create(root);
+
+ // null min and max keys forces a full scan of all records
ReaderPair pair = new OrcRawRecordMerger.OriginalReaderPairToRead(key,
reader, BUCKET, null, null,
- new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root),
conf, new ValidReaderWriteIdList(), 0);
- assertEquals("first", value(pair.nextRecord()));
- assertEquals(0, key.getWriteId());
- assertEquals(bucketProperty, key.getBucketProperty());
- assertEquals(0, key.getRowId());
- assertEquals(0, key.getCurrentWriteId());
+ new Reader.Options(), new OrcRawRecordMerger.Options().rootPath(root),
conf,
+ new ValidReaderWriteIdList(), 0);
+ checkReaderRecord(0, bucketProperty, 0, 0, "first", key, pair);
pair.next(pair.nextRecord());
- assertEquals("second", value(pair.nextRecord()));
- assertEquals(0, key.getWriteId());
- assertEquals(bucketProperty, key.getBucketProperty());
- assertEquals(1, key.getRowId());
- assertEquals(0, key.getCurrentWriteId());
+ checkReaderRecord(0, bucketProperty, 1, 0, "second", key, pair);
pair.next(pair.nextRecord());
- assertEquals("third", value(pair.nextRecord()));
- assertEquals(0, key.getWriteId());
- assertEquals(bucketProperty, key.getBucketProperty());
- assertEquals(2, key.getRowId());
- assertEquals(0, key.getCurrentWriteId());
+ checkReaderRecord(0, bucketProperty, 2, 0, "third", key, pair);
pair.next(pair.nextRecord());
- assertEquals("fourth", value(pair.nextRecord()));
- assertEquals(0, key.getWriteId());
- assertEquals(bucketProperty, key.getBucketProperty());
- assertEquals(3, key.getRowId());
- assertEquals(0, key.getCurrentWriteId());
+ checkReaderRecord(0, bucketProperty, 3, 0, "fourth", key, pair);
pair.next(pair.nextRecord());
- assertEquals("fifth", value(pair.nextRecord()));
- assertEquals(0, key.getWriteId());
- assertEquals(bucketProperty, key.getBucketProperty());
- assertEquals(4, key.getRowId());
- assertEquals(0, key.getCurrentWriteId());
+ checkReaderRecord(0, bucketProperty, 4, 0, "fifth", key, pair);
pair.next(pair.nextRecord());
- assertEquals(null, pair.nextRecord());
+ assertNull(pair.nextRecord());
Mockito.verify(pair.getRecordReader()).close();
}
- @Test
- public void testNewBase() throws Exception {
- Configuration conf = new Configuration();
- conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, "col1");
- conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, "string");
- HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN,
true);
+ private Reader buildMockReaderMerger(List<String> columnNames,
List<OrcProto.Type.Kind> columnKinds,
+ boolean addHiveAcidKeyIndexMetadata) throws IOException {
Reader reader = Mockito.mock(Reader.class, settings);
RecordReader recordReader = Mockito.mock(RecordReader.class, settings);
- List<OrcProto.Type> types = new ArrayList<OrcProto.Type>();
- OrcProto.Type.Builder typeBuilder = OrcProto.Type.newBuilder();
- typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1)
- .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5)
- .addSubtypes(6);
- typeBuilder.addAllFieldNames(Lists.newArrayList(
- OrcRecordUpdater.OPERATION_FIELD_NAME,
- OrcRecordUpdater.CURRENT_WRITEID_FIELD_NAME,
- OrcRecordUpdater.BUCKET_FIELD_NAME,
- OrcRecordUpdater.ROW_ID_FIELD_NAME,
- OrcRecordUpdater.CURRENT_WRITEID_FIELD_NAME,
- OrcRecordUpdater.ROW_FIELD_NAME));
- types.add(typeBuilder.build());
- types.add(null);
- types.add(null);
- types.add(null);
- types.add(null);
- types.add(null);
- typeBuilder.clearSubtypes();
- typeBuilder.addSubtypes(7);
- typeBuilder.addAllFieldNames(Lists.newArrayList("col1"));
- types.add(typeBuilder.build());
- typeBuilder.clear();
- typeBuilder.setKind(OrcProto.Type.Kind.STRING);
- types.add(typeBuilder.build());
-
- when(reader.getTypes()).thenReturn(types);
+ when(reader.getTypes()).thenReturn(buildReaderTypes(columnNames,
columnKinds));
when(reader.rowsOptions(any(Reader.Options.class), any()))
.thenReturn(recordReader);
@@ -482,57 +437,165 @@ public class TestOrcRawRecordMerger {
OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS);
setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth");
- when(recordReader.hasNext()).
- thenReturn(true, true, true, true, true, false);
-
when(recordReader.getProgress()).thenReturn(1.0f);
- when(recordReader.next(null)).thenReturn(row1, row4);
- when(recordReader.next(row1)).thenReturn(row2);
- when(recordReader.next(row2)).thenReturn(row3);
- when(recordReader.next(row3)).thenReturn(row5);
+ if (addHiveAcidKeyIndexMetadata) {
+ // hasNext() and next() need re-checking after fixing
https://issues.apache.org/jira/browse/HIVE-26150
+ when(recordReader.hasNext()).
+ thenReturn(true, true, true, true, true, false);
+
+ when(recordReader.next(null)).thenReturn(row1, row4);
+ when(recordReader.next(row1)).thenReturn(row2);
+ when(recordReader.next(row2)).thenReturn(row3);
+ when(recordReader.next(row3)).thenReturn(row5);
+ } else {
+ // it should be (true x 4, false), see
https://issues.apache.org/jira/browse/HIVE-26150
+ when(recordReader.hasNext()).
+ thenReturn(true, true, true, true, true, true, true, true, true,
false);
+
+ when(recordReader.next(null)).thenReturn(row1);
+ when(recordReader.next(row1)).thenReturn(row2);
+ when(recordReader.next(row2)).thenReturn(row3);
+ when(recordReader.next(row3)).thenReturn(row4);
+ when(recordReader.next(row4)).thenReturn(row5);
+ }
when(reader.hasMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME))
- .thenReturn(true);
- when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME))
- .thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61"
- .getBytes("UTF-8")));
- when(reader.getStripes())
- .thenReturn(createStripes(2, 2, 1));
-
- OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader,
+ .thenReturn(addHiveAcidKeyIndexMetadata);
+ if (addHiveAcidKeyIndexMetadata) {
+ when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME))
+
.thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61".getBytes(StandardCharsets.UTF_8)));
+ }
+ when(reader.getStripes()).thenReturn(createStripes(2, 2, 1));
+
+ return reader;
+ }
+
+ @Test
+ public void testNewBase() throws Exception {
+ final List<String> columnNames = Collections.singletonList("col1");
+ final List<String> columnTypes = Collections.singletonList("string");
+ final List<OrcProto.Type.Kind> columnKinds =
Collections.singletonList(OrcProto.Type.Kind.STRING);
+
+ final Configuration conf = new Configuration();
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNames.get(0));
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypes.get(0));
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN,
true);
+
+ final Reader reader = buildMockReaderMerger(columnNames, columnKinds,
true);
+
+ final OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false,
reader,
false, 10, createMaximalTxnList(),
new Reader.Options().range(1000, 1000), null, new
OrcRawRecordMerger.Options());
- RecordReader rr = merger.getCurrentReader().getRecordReader();
+ final RecordReader rr = merger.getCurrentReader().getRecordReader();
+
assertEquals(0, merger.getOtherReaders().size());
+ assertEquals(merger.getMinKey().toString(), new RecordIdentifier(10, 20,
30), merger.getMinKey());
+ assertEquals(merger.getMaxKey().toString(), new RecordIdentifier(40, 50,
60), merger.getMaxKey());
+
+ final RecordIdentifier id = merger.createKey();
+ final OrcStruct event = merger.createValue();
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(10, 20, 40, "third", id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(40, 50, 60, "fourth", id, event);
+
+ assertFalse(merger.next(id, event));
+ assertEquals(1.0, merger.getProgress(), 0.01);
+ merger.close();
+ Mockito.verify(rr).close();
+ Mockito.verify(rr).getProgress();
+
+ assertRecordMergerStructFields(merger, columnNames);
+ }
- assertEquals("" + merger.getMinKey(),new RecordIdentifier(10, 20, 30),
merger.getMinKey());
- assertEquals("" + merger.getMaxKey(), new RecordIdentifier(40, 50, 60),
merger.getMaxKey());
+ @Test
+ public void testNewBaseNoHiveAcidKeyIndexMetadata() throws Exception {
+ final List<String> columnNames = Collections.singletonList("col1");
+ final List<String> columnTypes = Collections.singletonList("string");
+ final List<OrcProto.Type.Kind> columnKinds =
Collections.singletonList(OrcProto.Type.Kind.STRING);
+
+ final Configuration conf = new Configuration();
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, columnNames.get(0));
+ conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, columnTypes.get(0));
+ HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN,
true);
+
+ final Reader reader = buildMockReaderMerger(columnNames, columnKinds,
false);
+ final OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, true,
reader,
+ false, 10, createMaximalTxnList(),
+ new Reader.Options().range(1000, 1000), null, new
OrcRawRecordMerger.Options());
+ final RecordReader rr = merger.getCurrentReader().getRecordReader();
+ assertEquals(0, merger.getOtherReaders().size());
+
+ assertNull(merger.getMinKey());
+ assertNull(merger.getMaxKey());
RecordIdentifier id = merger.createKey();
OrcStruct event = merger.createValue();
- assertEquals(true, merger.next(id, event));
- assertEquals(10, id.getWriteId());
- assertEquals(20, id.getBucketProperty());
- assertEquals(40, id.getRowId());
- assertEquals("third", getValue(event));
+ // minKey/maxKey = null => full record scan
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(10, 20, 20, "first", id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(10, 20, 30, "second", id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(10, 20, 40, "third", id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(40, 50, 60, "fourth", id, event);
- assertEquals(true, merger.next(id, event));
- assertEquals(40, id.getWriteId());
- assertEquals(50, id.getBucketProperty());
- assertEquals(60, id.getRowId());
- assertEquals("fourth", getValue(event));
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(40, 50, 61, "fifth", id, event);
- assertEquals(false, merger.next(id, event));
assertEquals(1.0, merger.getProgress(), 0.01);
merger.close();
Mockito.verify(rr).close();
Mockito.verify(rr).getProgress();
+ assertRecordMergerStructFields(merger, columnNames);
+ }
+
+ private static List<OrcProto.Type> buildReaderTypes(List<String> columns,
List<OrcProto.Type.Kind> columnTypes) {
+ final List<OrcProto.Type> types = new ArrayList<>();
+ final OrcProto.Type.Builder typeBuilder = OrcProto.Type.newBuilder();
+
+ typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1)
+ .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5)
+ .addSubtypes(6);
+ typeBuilder.addAllFieldNames(Lists.newArrayList(
+ OrcRecordUpdater.OPERATION_FIELD_NAME,
+ OrcRecordUpdater.CURRENT_WRITEID_FIELD_NAME,
+ OrcRecordUpdater.BUCKET_FIELD_NAME,
+ OrcRecordUpdater.ROW_ID_FIELD_NAME,
+ OrcRecordUpdater.CURRENT_WRITEID_FIELD_NAME,
+ OrcRecordUpdater.ROW_FIELD_NAME));
+
+ types.add(typeBuilder.build());
+ types.add(null);
+ types.add(null);
+ types.add(null);
+ types.add(null);
+ types.add(null);
+ typeBuilder.clearSubtypes();
+ typeBuilder.addSubtypes(7);
+ typeBuilder.addAllFieldNames(columns);
+ types.add(typeBuilder.build());
+ typeBuilder.clear();
+ columnTypes.forEach(typeBuilder::setKind);
+ types.add(typeBuilder.build());
+
+ return types;
+ }
+
+ private static void assertRecordMergerStructFields(OrcRawRecordMerger
merger, List<String> columns) {
StructObjectInspector eventObjectInspector =
(StructObjectInspector) merger.getObjectInspector();
List<? extends StructField> fields =
eventObjectInspector.getAllStructFieldRefs();
+
+ // check ACID (internal) columns
assertEquals(OrcRecordUpdater.FIELDS, fields.size());
assertEquals(OrcRecordUpdater.OPERATION_FIELD_NAME,
fields.get(OrcRecordUpdater.OPERATION).getFieldName());
@@ -547,8 +610,12 @@ public class TestOrcRawRecordMerger {
StructObjectInspector rowObjectInspector =
(StructObjectInspector) fields.get(OrcRecordUpdater.ROW)
.getFieldObjectInspector();
- assertEquals("col1",
- rowObjectInspector.getAllStructFieldRefs().get(0).getFieldName());
+
+ // check actual row columns (that is, user data)
+ for (int i = 0; i < columns.size(); i++) {
+ assertEquals(columns.get(i),
+ rowObjectInspector.getAllStructFieldRefs().get(i).getFieldName());
+ }
}
static class MyRow {
@@ -574,7 +641,8 @@ public class TestOrcRawRecordMerger {
}
static String getValue(OrcStruct event) {
- return OrcRecordUpdater.getRow(event).getFieldValue(0).toString();
+ OrcStruct struct = OrcRecordUpdater.getRow(event);
+ return struct == null ? null : struct.getFieldValue(0).toString();
}
@Rule
@@ -661,7 +729,7 @@ public class TestOrcRawRecordMerger {
AcidUtils.getPaths(directory.getCurrentDirectories()), new
OrcRawRecordMerger.Options().isCompacting(false));
RecordIdentifier key = merger.createKey();
OrcStruct value = merger.createValue();
- assertEquals(false, merger.next(key, value));
+ assertFalse(merger.next(key, value));
}
/**
@@ -745,72 +813,52 @@ public class TestOrcRawRecordMerger {
new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
createMaximalTxnList(), new Reader.Options(),
new Path[] {deleteDeltaDir}, new
OrcRawRecordMerger.Options().isCompacting(false));
- assertEquals(null, merger.getMinKey());
- assertEquals(null, merger.getMaxKey());
+ assertNull(merger.getMinKey());
+ assertNull(merger.getMaxKey());
RecordIdentifier id = merger.createKey();
OrcStruct event = merger.createValue();
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
- assertEquals("second", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id);
- assertEquals("fifth", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id);
- assertEquals("sixth", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id);
- assertEquals("seventh", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id);
- assertEquals("tenth", getValue(event));
-
- assertEquals(false, merger.next(id, event));
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "second",
+ new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "fifth",
+ new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "sixth",
+ new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "seventh",
+ new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "tenth",
+ new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id, event);
+
+ assertFalse(merger.next(id, event));
merger.close();
//second "split" is delta_200_200
@@ -820,58 +868,42 @@ public class TestOrcRawRecordMerger {
new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET,
createMaximalTxnList(), new Reader.Options(),
new Path[] {deleteDeltaDir}, new
OrcRawRecordMerger.Options().isCompacting(false));
- assertEquals(null, merger.getMinKey());
- assertEquals(null, merger.getMaxKey());
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
- assertEquals("update 1", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
- assertEquals("update 2", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
- assertEquals("update 3", getValue(event));
-
- assertEquals(false, merger.next(id, event));
+ assertNull(merger.getMinKey());
+ assertNull(merger.getMaxKey());
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "update 1",
+ new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "update 2",
+ new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "update 3",
+ new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id, event);
+
+ assertFalse(merger.next(id, event));
merger.close();
//now run as if it's a minor Compaction so we don't collapse events
@@ -880,61 +912,45 @@ public class TestOrcRawRecordMerger {
new OrcRawRecordMerger(conf, false, null, false, BUCKET,
createMaximalTxnList(), new Reader.Options(),
AcidUtils.getPaths(directory.getCurrentDirectories()), new
OrcRawRecordMerger.Options().isCompacting(true));
- assertEquals(null, merger.getMinKey());
- assertEquals(null, merger.getMaxKey());
+ assertNull(merger.getMinKey());
+ assertNull(merger.getMaxKey());
- assertEquals(true, merger.next(id, event));
+ assertTrue(merger.next(id, event));
//minor comp, so we ignore 'base_0000100' files so all Deletes end up
first since
// they all modify primordial rows
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id, event);
//data from delta_200_200
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
- assertEquals("update 1", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
- assertEquals("update 2", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
- assertEquals("update 3", getValue(event));
-
- assertEquals(false, merger.next(id, event));
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "update 1",
+ new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "update 2",
+ new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "update 3",
+ new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id, event);
+
+ assertFalse(merger.next(id, event));
merger.close();
//now run as if it's a major Compaction so we collapse events
@@ -946,89 +962,63 @@ public class TestOrcRawRecordMerger {
createMaximalTxnList(), new Reader.Options(),
AcidUtils.getPaths(directory.getCurrentDirectories()), new
OrcRawRecordMerger.Options()
.isCompacting(true).isMajorCompaction(true).baseDir(new Path(root,
"base_0000100")));
- assertEquals(null, merger.getMinKey());
- assertEquals(null, merger.getMaxKey());
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id);
- assertEquals("second", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id);
- assertEquals("fifth", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id);
- assertEquals("sixth", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id);
- assertEquals("seventh", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.DELETE_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id);
- assertNull(OrcRecordUpdater.getRow(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id);
- assertEquals("tenth", getValue(event));
+ assertNull(merger.getMinKey());
+ assertNull(merger.getMaxKey());
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 0, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "second",
+ new ReaderKey(0, BUCKET_PROPERTY, 1, 0), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 2, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 3, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "fifth",
+ new ReaderKey(0, BUCKET_PROPERTY, 4, 0), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "sixth",
+ new ReaderKey(0, BUCKET_PROPERTY, 5, 0), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "seventh",
+ new ReaderKey(0, BUCKET_PROPERTY, 6, 0), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 7, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.DELETE_OPERATION, null,
+ new ReaderKey(0, BUCKET_PROPERTY, 8, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "tenth",
+ new ReaderKey(0, BUCKET_PROPERTY, 9, 0), id, event);
//data from delta_200_200
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id);
- assertEquals("update 1", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id);
- assertEquals("update 2", getValue(event));
-
- assertEquals(true, merger.next(id, event));
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id);
- assertEquals("update 3", getValue(event));
-
- assertEquals(false, merger.next(id, event));
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "update 1",
+ new ReaderKey(200, BUCKET_PROPERTY, 0, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "update 2",
+ new ReaderKey(200, BUCKET_PROPERTY, 1, 200), id, event);
+
+ assertTrue(merger.next(id, event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, "update 3",
+ new ReaderKey(200, BUCKET_PROPERTY, 2, 200), id, event);
+
+ assertFalse(merger.next(id, event));
merger.close();
// try ignoring the 200 transaction and make sure it works still
@@ -1041,18 +1031,16 @@ public class TestOrcRawRecordMerger {
writeIds, new Reader.Options(),
new Path[] {deleteDeltaDir}, new
OrcRawRecordMerger.Options().isCompacting(false));
- assertEquals(null, merger.getMinKey());
- assertEquals(null, merger.getMaxKey());
+ assertNull(merger.getMinKey());
+ assertNull(merger.getMaxKey());
for(int i=0; i < values.length; ++i) {
- assertEquals(true, merger.next(id, event));
+ assertTrue(merger.next(id, event));
LOG.info("id = " + id + "event = " + event);
- assertEquals(OrcRecordUpdater.INSERT_OPERATION,
- OrcRecordUpdater.getOperation(event));
- assertEquals(new ReaderKey(0, BUCKET_PROPERTY, i, 0), id);
- assertEquals(values[i], getValue(event));
+ checkMergerRecord(OrcRecordUpdater.INSERT_OPERATION, values[i],
+ new ReaderKey(0, BUCKET_PROPERTY, i, 0), id, event);
}
- assertEquals(false, merger.next(id, event));
+ assertFalse(merger.next(id, event));
merger.close();
// 2nd split is for delta_200_200 which is filtered out entirely by "txns"
@@ -1063,9 +1051,9 @@ public class TestOrcRawRecordMerger {
writeIds, new Reader.Options(),
new Path[] {deleteDeltaDir}, new
OrcRawRecordMerger.Options().isCompacting(false));
- assertEquals(null, merger.getMinKey());
- assertEquals(null, merger.getMaxKey());
- assertEquals(false, merger.next(id, event));
+ assertNull(merger.getMinKey());
+ assertNull(merger.getMaxKey());
+ assertFalse(merger.next(id, event));
merger.close();
}
@@ -1217,12 +1205,12 @@ public class TestOrcRawRecordMerger {
}
if(i >= expected.length) {
//not found
- assertTrue("Found unexpected row: " + mr, false );
+ fail("Found unexpected row: " + mr);
}
}
}
for(MyResult mr : expected) {
- assertTrue("Expected " + mr + " not found in any InputSplit", mr ==
null);
+ assertNull("Expected " + mr + " not found in any InputSplit", mr);
}
}
@@ -1341,12 +1329,12 @@ public class TestOrcRawRecordMerger {
}
if(i >= expected.length) {
//not found
- assertTrue("Found unexpected row: " + mr, false );
+ fail("Found unexpected row: " + mr);
}
}
}
for(MyResult mr : expected) {
- assertTrue("Expected " + mr + " not found in any InputSplit", mr ==
null);
+ assertNull("Expected " + mr + " not found in any InputSplit", mr);
}
}
private static final class MyResult {
@@ -1431,10 +1419,10 @@ public class TestOrcRawRecordMerger {
for (int i = 0; i < values[j].length; ++i) {
System.out.println("Checking " + i);
String msg = "split[" + j + "] at i=" + i;
- assertEquals(msg, true, rr.next(NullWritable.get(), row));
+ assertTrue(msg, rr.next(NullWritable.get(), row));
assertEquals(msg, values[j][i], row.getFieldValue(0).toString());
}
- assertEquals(false, rr.next(NullWritable.get(), row));
+ assertFalse(rr.next(NullWritable.get(), row));
}
}
@@ -1474,8 +1462,8 @@ public class TestOrcRawRecordMerger {
}
RecordUpdater ru = of.getRecordUpdater(root, options);
String[] values= new String[]{"1", "2", "3", "4", "5"};
- for(int i=0; i < values.length; ++i) {
- ru.insert(0, new MyRow(values[i]));
+ for (String s : values) {
+ ru.insert(0, new MyRow(s));
}
ru.close(false);
@@ -1484,8 +1472,8 @@ public class TestOrcRawRecordMerger {
.maximumWriteId(19);
ru = of.getRecordUpdater(root, options);
values = new String[]{"6", "7", "8"};
- for(int i=0; i < values.length; ++i) {
- ru.insert(1, new MyRow(values[i]));
+ for (String s : values) {
+ ru.insert(1, new MyRow(s));
}
InputFormat inf = new OrcInputFormat();
JobConf job = new JobConf();
@@ -1510,16 +1498,16 @@ public class TestOrcRawRecordMerger {
System.out.println("Looking at split " + splits[0]);
for(int i=1; i < 6; ++i) {
System.out.println("Checking row " + i);
- assertEquals(true, rr.next(key, value));
+ assertTrue(rr.next(key, value));
assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
}
- assertEquals(false, rr.next(key, value));
+ assertFalse(rr.next(key, value));
ru.flush();
ru.flush();
values = new String[]{"9", "10"};
- for(int i=0; i < values.length; ++i) {
- ru.insert(3, new MyRow(values[i]));
+ for (String s : values) {
+ ru.insert(3, new MyRow(s));
}
ru.flush();
@@ -1527,22 +1515,22 @@ public class TestOrcRawRecordMerger {
assertEquals(2, splits.length);
Path sideFile = new Path(root + "/" + (use130Format ?
AcidUtils.deltaSubdir(10,19,0) :
AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length");
- assertEquals(true, fs.exists(sideFile));
+ assertTrue(fs.exists(sideFile));
assertEquals(32, fs.getFileStatus(sideFile).getLen());
rr = inf.getRecordReader(splits[0], job, Reporter.NULL);
for(int i=1; i <= 5; ++i) {
- assertEquals(true, rr.next(key, value));
+ assertTrue(rr.next(key, value));
assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
}
- assertEquals(false, rr.next(key, value));
+ assertFalse(rr.next(key, value));
rr = inf.getRecordReader(splits[1], job, Reporter.NULL);
for(int i=6; i < 11; ++i) {
- assertEquals("i="+ i, true, rr.next(key, value));
+ assertTrue("i=" + i, rr.next(key, value));
assertEquals(Integer.toString(i), value.getFieldValue(0).toString());
}
- assertEquals(false, rr.next(key, value));
+ assertFalse(rr.next(key, value));
}
}