PHOENIX-3486 RoundRobinResultIterator doesn't work correctly because of setting 
Scan's cache size inappropriately in PhoenixInputForamt

Signed-off-by: Sergey Soldatov <s...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/816840ba
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/816840ba
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/816840ba

Branch: refs/heads/4.x-HBase-1.1
Commit: 816840ba9e4d9e5054b4ffb46d1fe806790affad
Parents: 893bad3
Author: Jeongdae Kim <kjd9...@gmail.com>
Authored: Mon Nov 21 11:03:19 2016 +0900
Committer: Sergey Soldatov <s...@apache.org>
Committed: Sun Feb 19 17:22:04 2017 -0800

----------------------------------------------------------------------
 .../hive/mapreduce/PhoenixInputFormat.java      | 27 ++++++++++----------
 1 file changed, 13 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/816840ba/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
index 7eab317..3a94655 100644
--- 
a/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
+++ 
b/phoenix-hive/src/main/java/org/apache/phoenix/hive/mapreduce/PhoenixInputFormat.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HConnection;
@@ -142,11 +143,8 @@ public class PhoenixInputFormat<T extends DBWritable> 
implements InputFormat<Wri
                 .newJobContext(new Job(jobConf)));
         boolean splitByStats = 
jobConf.getBoolean(PhoenixStorageHandlerConstants.SPLIT_BY_STATS,
                 false);
-        int scanCacheSize = 
jobConf.getInt(PhoenixStorageHandlerConstants.HBASE_SCAN_CACHE, -1);
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Generating splits with scanCacheSize : " + 
scanCacheSize);
-        }
+        setScanCacheSize(jobConf);
 
         // Adding Localization
         HConnection connection = HConnectionManager.createConnection(jobConf);
@@ -166,10 +164,6 @@ public class PhoenixInputFormat<T extends DBWritable> 
implements InputFormat<Wri
 
             if (splitByStats) {
                 for (Scan aScan : scans) {
-                    if (scanCacheSize > 0) {
-                        aScan.setCaching(scanCacheSize);
-                    }
-
                     if (LOG.isDebugEnabled()) {
                         LOG.debug("Split for  scan : " + aScan + "with 
scanAttribute : " + aScan
                                 .getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : [" +
@@ -183,12 +177,6 @@ public class PhoenixInputFormat<T extends DBWritable> 
implements InputFormat<Wri
                     psplits.add(inputSplit);
                 }
             } else {
-                if (scanCacheSize > 0) {
-                    for (Scan aScan : scans) {
-                        aScan.setCaching(scanCacheSize);
-                    }
-                }
-
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Scan count[" + scans.size() + "] : " + 
Bytes.toStringBinary(scans
                             .get(0).getStartRow()) + " ~ " + 
Bytes.toStringBinary(scans.get(scans
@@ -216,6 +204,17 @@ public class PhoenixInputFormat<T extends DBWritable> 
implements InputFormat<Wri
         return psplits;
     }
 
+    private void setScanCacheSize(JobConf jobConf) {
+        int scanCacheSize = 
jobConf.getInt(PhoenixStorageHandlerConstants.HBASE_SCAN_CACHE, -1);
+        if (scanCacheSize > 0) {
+            jobConf.setInt(HConstants.HBASE_CLIENT_SCANNER_CACHING, 
scanCacheSize);
+        }
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Generating splits with scanCacheSize : " + 
scanCacheSize);
+        }
+    }
+
     @Override
     public RecordReader<WritableComparable, T> getRecordReader(InputSplit 
split, JobConf job,
                                                                Reporter 
reporter) throws

Reply via email to