This is an automated email from the ASF dual-hosted git repository.
jcamacho pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new aed7500 HIVE-21905: Generics improvement around the FetchOperator
class (Ivan Suller via Jesus Camacho Rodriguez)
aed7500 is described below
commit aed75009b899ea889ad231586ecfce5118aaeae6
Author: Ivan Suller <[email protected]>
AuthorDate: Tue Jun 25 10:43:38 2019 -0700
HIVE-21905: Generics improvement around the FetchOperator class (Ivan
Suller via Jesus Camacho Rodriguez)
---
.../apache/hadoop/hive/ql/exec/FetchOperator.java | 12 ++--
.../apache/hadoop/hive/ql/exec/FooterBuffer.java | 10 ++--
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 5 +-
.../hive/ql/io/HiveContextAwareRecordReader.java | 17 +++---
.../apache/hadoop/hive/ql/io/HiveRecordReader.java | 11 ++--
.../ql/io/TestHiveBinarySearchRecordReader.java | 70 +++++++++++-----------
6 files changed, 62 insertions(+), 63 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
index 3550747..c8fa1b9 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
@@ -214,7 +214,6 @@ public class FetchOperator implements Serializable {
*/
private static final Map<String, InputFormat> inputFormats = new
HashMap<String, InputFormat>();
- @SuppressWarnings("unchecked")
public static InputFormat getInputFormatFromCache(
Class<? extends InputFormat> inputFormatClass, Configuration conf)
throws IOException {
if (Configurable.class.isAssignableFrom(inputFormatClass) ||
@@ -319,7 +318,7 @@ public class FetchOperator implements Serializable {
private RecordReader<WritableComparable, Writable> getRecordReader() throws
Exception {
if (!iterSplits.hasNext()) {
- FetchInputFormatSplit[] splits = getNextSplits();
+ List<FetchInputFormatSplit> splits = getNextSplits();
if (splits == null) {
return null;
}
@@ -335,7 +334,7 @@ public class FetchOperator implements Serializable {
if (isPartitioned) {
row[1] = createPartValue(currDesc, partKeyOI);
}
- iterSplits = Arrays.asList(splits).iterator();
+ iterSplits = splits.iterator();
if (LOG.isDebugEnabled()) {
LOG.debug("Creating fetchTask with deserializer typeinfo: "
@@ -348,7 +347,6 @@ public class FetchOperator implements Serializable {
final FetchInputFormatSplit target = iterSplits.next();
- @SuppressWarnings("unchecked")
final RecordReader<WritableComparable, Writable> reader =
target.getRecordReader(job);
if (hasVC || work.getSplitSample() != null) {
currRecReader = new HiveRecordReader<WritableComparable,
Writable>(reader, job) {
@@ -374,7 +372,7 @@ public class FetchOperator implements Serializable {
return currRecReader;
}
- protected FetchInputFormatSplit[] getNextSplits() throws Exception {
+ private List<FetchInputFormatSplit> getNextSplits() throws Exception {
while (getNextPath()) {
// not using FileInputFormat.setInputPaths() here because it forces a
connection to the
// default file system - which may or may not be online during pure
metadata operations
@@ -437,7 +435,7 @@ public class FetchOperator implements Serializable {
if (HiveConf.getBoolVar(job, HiveConf.ConfVars.HIVE_IN_TEST)) {
Collections.sort(inputSplits, new FetchInputFormatSplitComparator());
}
- return inputSplits.toArray(new
FetchInputFormatSplit[inputSplits.size()]);
+ return inputSplits;
}
return null;
@@ -663,7 +661,6 @@ public class FetchOperator implements Serializable {
*/
public void setupContext(List<Path> paths) {
this.iterPath = paths.iterator();
- List<PartitionDesc> partitionDescs;
if (!isPartitioned) {
this.iterPartDesc = Iterators.cycle(new PartitionDesc(work.getTblDesc(),
null));
} else {
@@ -689,7 +686,6 @@ public class FetchOperator implements Serializable {
}
partKeyOI = getPartitionKeyOI(tableDesc);
- PartitionDesc partDesc = new PartitionDesc(tableDesc, null);
List<PartitionDesc> listParts = work.getPartDesc();
// Chose the table descriptor if none of the partitions is present.
// For eg: consider the query:
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java
index be88dad0..8ead797 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FooterBuffer.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.util.ReflectionUtils;
public class FooterBuffer {
- private ArrayList<ObjectPair> buffer;
+ private ArrayList<ObjectPair<WritableComparable, Writable>> buffer;
private int cur;
public FooterBuffer() {
@@ -64,13 +64,13 @@ public class FooterBuffer {
int footerCount, WritableComparable key, Writable value) throws
IOException {
// Fill the buffer with key value pairs.
- this.buffer = new ArrayList<ObjectPair>();
+ this.buffer = new ArrayList<>();
while (buffer.size() < footerCount) {
boolean notEOF = recordreader.next(key, value);
if (!notEOF) {
return false;
}
- ObjectPair tem = new ObjectPair();
+ ObjectPair<WritableComparable, Writable> tem = new ObjectPair<>();
tem.setFirst(ReflectionUtils.copy(job, key, tem.getFirst()));
tem.setSecond(ReflectionUtils.copy(job, value, tem.getSecond()));
buffer.add(tem);
@@ -98,8 +98,8 @@ public class FooterBuffer {
*/
public boolean updateBuffer(JobConf job, RecordReader recordreader,
WritableComparable key, Writable value) throws IOException {
- key = ReflectionUtils.copy(job,
(WritableComparable)buffer.get(cur).getFirst(), key);
- value = ReflectionUtils.copy(job, (Writable)buffer.get(cur).getSecond(),
value);
+ key = ReflectionUtils.copy(job, buffer.get(cur).getFirst(), key);
+ value = ReflectionUtils.copy(job, buffer.get(cur).getSecond(), value);
boolean notEOF = recordreader.next(buffer.get(cur).getFirst(),
buffer.get(cur).getSecond());
if (notEOF) {
cur = (++cur) % buffer.size();
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index d91cd60..bc75ec0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -183,7 +183,6 @@ import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.CompressionType;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.FileInputFormat;
@@ -3823,8 +3822,8 @@ public final class Utilities {
* @return Return true if there are 0 or more records left in the file
* after skipping all headers, otherwise return false.
*/
- public static boolean skipHeader(RecordReader<WritableComparable, Writable>
currRecReader,
- int headerCount, WritableComparable key, Writable value) throws
IOException {
+ public static <K, V> boolean skipHeader(RecordReader<K, V> currRecReader,
int headerCount, K key, V value)
+ throws IOException {
while (headerCount > 0) {
if (!currRecReader.next(key, value)) {
return false;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
index 0287bd3..7f3ef37 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveContextAwareRecordReader.java
@@ -54,7 +54,8 @@ import org.apache.hadoop.mapred.RecordReader;
* a binary search to find the block to begin reading from, and stop reading
once it can be
* determined no other entries will match the filter.
*/
-public abstract class HiveContextAwareRecordReader<K, V> implements
RecordReader<K, V> {
+public abstract class HiveContextAwareRecordReader<K extends
WritableComparable, V extends Writable>
+ implements RecordReader<K, V> {
private static final Logger LOG =
LoggerFactory.getLogger(HiveContextAwareRecordReader.class.getName());
@@ -68,7 +69,7 @@ public abstract class HiveContextAwareRecordReader<K, V>
implements RecordReader
private final List<Comparison> stopComparisons = new ArrayList<Comparison>();
private Map<Path, PartitionDesc> pathToPartitionInfo;
- protected RecordReader recordReader;
+ protected RecordReader<K, V> recordReader;
protected JobConf jobConf;
protected boolean isSorted = false;
@@ -76,17 +77,17 @@ public abstract class HiveContextAwareRecordReader<K, V>
implements RecordReader
this(null, conf);
}
- public HiveContextAwareRecordReader(RecordReader recordReader) {
+ public HiveContextAwareRecordReader(RecordReader<K, V> recordReader) {
this.recordReader = recordReader;
}
- public HiveContextAwareRecordReader(RecordReader recordReader, JobConf conf)
+ public HiveContextAwareRecordReader(RecordReader<K, V> recordReader, JobConf
conf)
throws IOException {
this.recordReader = recordReader;
this.jobConf = conf;
}
- public void setRecordReader(RecordReader recordReader) {
+ public void setRecordReader(RecordReader<K, V> recordReader) {
this.recordReader = recordReader;
}
@@ -345,12 +346,12 @@ public abstract class HiveContextAwareRecordReader<K, V>
implements RecordReader
}
// If input contains header, skip header.
- if (!Utilities.skipHeader(recordReader, headerCount,
(WritableComparable)key, (Writable)value)) {
+ if (!Utilities.skipHeader(recordReader, headerCount, key, value)) {
return false;
}
if (footerCount > 0) {
footerBuffer = new FooterBuffer();
- if (!footerBuffer.initializeBuffer(jobConf, recordReader,
footerCount, (WritableComparable)key, (Writable)value)) {
+ if (!footerBuffer.initializeBuffer(jobConf, recordReader,
footerCount, key, value)) {
return false;
}
}
@@ -360,7 +361,7 @@ public abstract class HiveContextAwareRecordReader<K, V>
implements RecordReader
// Table files don't have footer rows.
return recordReader.next(key, value);
} else {
- return footerBuffer.updateBuffer(jobConf, recordReader,
(WritableComparable)key, (Writable)value);
+ return footerBuffer.updateBuffer(jobConf, recordReader, key, value);
}
} catch (Exception e) {
return HiveIOExceptionHandlerUtil.handleRecordReaderNextException(e,
jobConf);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
index 3864060..359184a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveRecordReader.java
@@ -35,12 +35,12 @@ public class HiveRecordReader<K extends WritableComparable,
V extends Writable>
- public HiveRecordReader(RecordReader recordReader)
+ public HiveRecordReader(RecordReader<K, V> recordReader)
throws IOException {
super(recordReader);
}
- public HiveRecordReader(RecordReader recordReader, JobConf conf)
+ public HiveRecordReader(RecordReader<K, V> recordReader, JobConf conf)
throws IOException {
super(recordReader, conf);
}
@@ -50,14 +50,17 @@ public class HiveRecordReader<K extends WritableComparable,
V extends Writable>
recordReader.close();
}
+ @Override
public K createKey() {
- return (K) recordReader.createKey();
+ return recordReader.createKey();
}
+ @Override
public V createValue() {
- return (V) recordReader.createValue();
+ return recordReader.createValue();
}
+ @Override
public long getPos() throws IOException {
return recordReader.getPos();
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
index 1e46c60..3c53878 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/io/TestHiveBinarySearchRecordReader.java
@@ -59,7 +59,7 @@ public class TestHiveBinarySearchRecordReader extends
TestCase {
private RCFileRecordReader rcfReader;
private JobConf conf;
private TestHiveInputSplit hiveSplit;
- private HiveContextAwareRecordReader hbsReader;
+ private HiveContextAwareRecordReader<WritableComparable, Writable> hbsReader;
private IOContext ioContext;
private static class TestHiveInputSplit extends HiveInputSplit {
@@ -148,52 +148,52 @@ public class TestHiveBinarySearchRecordReader extends
TestCase {
hbsReader.initIOContext(hiveSplit, conf, Class.class, rcfReader);
}
- private boolean executeDoNext(HiveContextAwareRecordReader hbsReader) throws
IOException {
+ private boolean executeDoNext() throws IOException {
return hbsReader.next(hbsReader.createKey(), hbsReader.createValue());
}
public void testNonLinearGreaterThan() throws Exception {
init();
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
verify(rcfReader).sync(50);
ioContext.setComparison(1);
when(rcfReader.getPos()).thenReturn(25L);
// By setting the comparison to greater, the search should use the block
[0, 50]
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
verify(rcfReader).sync(25);
}
public void testNonLinearLessThan() throws Exception {
init();
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
verify(rcfReader).sync(50);
ioContext.setComparison(-1);
when(rcfReader.getPos()).thenReturn(75L);
// By setting the comparison to less, the search should use the block [50,
100]
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
verify(rcfReader).sync(75);
}
public void testNonLinearEqualTo() throws Exception {
init();
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
verify(rcfReader).sync(50);
ioContext.setComparison(0);
when(rcfReader.getPos()).thenReturn(25L);
// By setting the comparison to equal, the search should use the block [0,
50]
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
verify(rcfReader).sync(25);
}
public void testHitLastBlock() throws Exception {
init();
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
verify(rcfReader).sync(50);
ioContext.setComparison(-1);
@@ -202,7 +202,7 @@ public class TestHiveBinarySearchRecordReader extends
TestCase {
// When sync is called it will return 100, the value signaling the end of
the file, this should
// result in a call to sync to the beginning of the block it was searching
[50, 100], and it
// should continue normally
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
InOrder inOrder = inOrder(rcfReader);
inOrder.verify(rcfReader).sync(75);
inOrder.verify(rcfReader).sync(50);
@@ -211,14 +211,14 @@ public class TestHiveBinarySearchRecordReader extends
TestCase {
public void testHitSamePositionTwice() throws Exception {
init();
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
verify(rcfReader).sync(50);
ioContext.setComparison(1);
// When getPos is called it should return the same value, signaling the
end of the search, so
// the search should continue linearly and it should sync to the beginning
of the block [0, 50]
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
InOrder inOrder = inOrder(rcfReader);
inOrder.verify(rcfReader).sync(25);
inOrder.verify(rcfReader).sync(0);
@@ -228,20 +228,20 @@ public class TestHiveBinarySearchRecordReader extends
TestCase {
public void testResetRange() throws Exception {
init();
InOrder inOrder = inOrder(rcfReader);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
inOrder.verify(rcfReader).sync(50);
ioContext.setComparison(-1);
when(rcfReader.getPos()).thenReturn(75L);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
inOrder.verify(rcfReader).sync(75);
ioContext.setEndBinarySearch(true);
// This should make the search linear, sync to the beginning of the block
being searched
// [50, 100], set the comparison to be null, and the flag to reset the
range should be unset
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
inOrder.verify(rcfReader).sync(50);
Assert.assertFalse(ioContext.isBinarySearching());
Assert.assertFalse(ioContext.shouldEndBinarySearch());
@@ -251,68 +251,68 @@ public class TestHiveBinarySearchRecordReader extends
TestCase {
init();
ioContext.setGenericUDFClassName(GenericUDFOPEqual.class.getName());
Assert.assertTrue(ioContext.isBinarySearching());
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setBinarySearching(false);
ioContext.setComparison(-1);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setComparison(0);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setComparison(1);
- Assert.assertFalse(executeDoNext(hbsReader));
+ Assert.assertFalse(executeDoNext());
}
public void testLessThanOpClass() throws Exception {
init();
ioContext.setGenericUDFClassName(GenericUDFOPLessThan.class.getName());
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
Assert.assertFalse(ioContext.isBinarySearching());
ioContext.setComparison(-1);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setComparison(0);
- Assert.assertFalse(executeDoNext(hbsReader));
+ Assert.assertFalse(executeDoNext());
ioContext.setComparison(1);
- Assert.assertFalse(executeDoNext(hbsReader));
+ Assert.assertFalse(executeDoNext());
}
public void testLessThanOrEqualOpClass() throws Exception {
init();
ioContext.setGenericUDFClassName(GenericUDFOPEqualOrLessThan.class.getName());
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
Assert.assertFalse(ioContext.isBinarySearching());
ioContext.setComparison(-1);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setComparison(0);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setComparison(1);
- Assert.assertFalse(executeDoNext(hbsReader));
+ Assert.assertFalse(executeDoNext());
}
public void testGreaterThanOpClass() throws Exception {
init();
ioContext.setGenericUDFClassName(GenericUDFOPGreaterThan.class.getName());
Assert.assertTrue(ioContext.isBinarySearching());
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setBinarySearching(false);
ioContext.setComparison(-1);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setComparison(0);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setComparison(1);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
}
public void testGreaterThanOrEqualOpClass() throws Exception {
init();
ioContext.setGenericUDFClassName(GenericUDFOPEqualOrGreaterThan.class.getName());
Assert.assertTrue(ioContext.isBinarySearching());
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setBinarySearching(false);
ioContext.setComparison(-1);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setComparison(0);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
ioContext.setComparison(1);
- Assert.assertTrue(executeDoNext(hbsReader));
+ Assert.assertTrue(executeDoNext());
}
public static void main(String[] args) throws Exception {