PHOENIX-3486 RoundRobinResultIterator doesn't work correctly because of setting Scan's cache size inappropriately in PhoenixInputForamt
Signed-off-by: Sergey Soldatov <s...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/280b32ad Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/280b32ad Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/280b32ad Branch: refs/heads/4.x-HBase-0.98 Commit: 280b32ad61aa8f0c30cf4e4896d1305c59ad74c0 Parents: 54ae6f3 Author: Jeongdae Kim <kjd9...@gmail.com> Authored: Mon Nov 21 11:03:19 2016 +0900 Committer: Sergey Soldatov <s...@apache.org> Committed: Sun Feb 19 17:21:06 2017 -0800 ---------------------------------------------------------------------- .../hive/mapreduce/PhoenixInputFormat.java | 28 ++++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/280b32ad/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java index 7e2f3d1..2b56e99 100644 --- a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java +++ b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.HConnection; @@ -142,14 +143,12 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri .newJobContext(new Job(jobConf))); boolean splitByStats = jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS, false); - int scanCacheSize = jobConf.getInt(PhoenixStorageHandlerConstants.HBASE_SCAN_CACHE, -1); + setScanCacheSize(jobConf); - if (LOG.isDebugEnabled()) { - LOG.debug("Generating splits with scanCacheSize : " + scanCacheSize); - } String tableName = qplan .getTableRef().getTable().getPhysicalName().toString(); HTable table = new HTable(jobConf, tableName); + // Adding Localization HConnection connection = HConnectionManager.createConnection(jobConf); RegionSizeCalculator sizeCalculator = new RegionSizeCalculator(table); @@ -166,10 +165,6 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri if (splitByStats) { for (Scan aScan : scans) { - if (scanCacheSize > 0) { - aScan.setCaching(scanCacheSize); - } - if (LOG.isDebugEnabled()) { LOG.debug("Split for scan : " + aScan + "with scanAttribute : " + aScan .getAttributesMap() + " [scanCache, cacheBlock, scanBatch] : [" + @@ -183,12 +178,6 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri psplits.add(inputSplit); } } else { - if (scanCacheSize > 0) { - for (Scan aScan : scans) { - aScan.setCaching(scanCacheSize); - } - } - if (LOG.isDebugEnabled()) { LOG.debug("Scan count[" + scans.size() + "] : " + Bytes.toStringBinary(scans .get(0).getStartRow()) + " ~ " + Bytes.toStringBinary(scans.get(scans @@ -216,6 +205,17 @@ public class PhoenixInputFormat<T extends DBWritable> implements InputFormat<Wri return psplits; } + private void setScanCacheSize(JobConf jobConf) { + int scanCacheSize = jobConf.getInt(PhoenixStorageHandlerConstants.HBASE_SCAN_CACHE, -1); + if (scanCacheSize > 0) { + jobConf.setInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, scanCacheSize); + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Generating splits with scanCacheSize : " + scanCacheSize); + } + } + @Override public RecordReader<WritableComparable, T> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws