Repository: phoenix
Updated Branches:
  refs/heads/master 41d6349bd -> 8f2d0fbc5


PHOENIX-3600 Core MapReduce classes don't provide location info

This mostly just ports the same functionality in the phoenix-hive MR
classes to the main classes. Adds a new configuration parameter
'phoenix.mapreduce.split.by.stats', defaulting to true, to create
input splits based off the scans provided by statistics, not just the
region locations.


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/267323da
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/267323da
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/267323da

Branch: refs/heads/master
Commit: 267323da8242fb6f0953c1a75cf96c5fde3d49ed
Parents: 41d6349
Author: Josh Mahonin <jmaho...@gmail.com>
Authored: Mon Feb 13 10:55:06 2017 -0500
Committer: Josh Mahonin <jmaho...@gmail.com>
Committed: Mon Feb 13 10:55:06 2017 -0500

----------------------------------------------------------------------
 .../phoenix/mapreduce/PhoenixInputFormat.java   | 69 ++++++++++++++++++--
 .../phoenix/mapreduce/PhoenixInputSplit.java    | 23 ++++++-
 .../util/PhoenixConfigurationUtil.java          | 11 ++++
 3 files changed, 96 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/267323da/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
index df96c7b..14f7b94 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java
@@ -21,14 +21,18 @@ import java.io.IOException;
 import java.sql.Connection;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RegionSizeCalculator;
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -42,6 +46,7 @@ import 
org.apache.phoenix.iterate.MapReduceParallelScanGrouper;
 import org.apache.phoenix.jdbc.PhoenixStatement;
 import org.apache.phoenix.mapreduce.util.ConnectionUtil;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.mapreduce.util.PhoenixMapReduceUtil;
 import org.apache.phoenix.query.KeyRange;
 import org.apache.phoenix.util.PhoenixRuntime;
 
@@ -80,16 +85,72 @@ public class PhoenixInputFormat<T extends DBWritable> 
extends InputFormat<NullWr
         final Configuration configuration = context.getConfiguration();
         final QueryPlan queryPlan = getQueryPlan(context,configuration);
         final List<KeyRange> allSplits = queryPlan.getSplits();
-        final List<InputSplit> splits = generateSplits(queryPlan,allSplits);
+        final List<InputSplit> splits = generateSplits(queryPlan, allSplits, 
configuration);
         return splits;
     }
 
-    private List<InputSplit> generateSplits(final QueryPlan qplan, final 
List<KeyRange> splits) throws IOException {
+    private List<InputSplit> generateSplits(final QueryPlan qplan, final 
List<KeyRange> splits, Configuration config) throws IOException {
         Preconditions.checkNotNull(qplan);
         Preconditions.checkNotNull(splits);
+
+        // Get the RegionSizeCalculator
+        org.apache.hadoop.hbase.client.Connection connection = 
ConnectionFactory.createConnection(config);
+        RegionLocator regionLocator = 
connection.getRegionLocator(TableName.valueOf(qplan
+                .getTableRef().getTable().getPhysicalName().toString()));
+        RegionSizeCalculator sizeCalculator = new 
RegionSizeCalculator(regionLocator, connection
+                .getAdmin());
+
+
         final List<InputSplit> psplits = 
Lists.newArrayListWithExpectedSize(splits.size());
         for (List<Scan> scans : qplan.getScans()) {
-            psplits.add(new PhoenixInputSplit(scans));
+            // Get the region location
+            HRegionLocation location = regionLocator.getRegionLocation(
+                    scans.get(0).getStartRow(),
+                    false
+            );
+
+            String regionLocation = location.getHostname();
+
+            // Get the region size
+            long regionSize = sizeCalculator.getRegionSize(
+                    location.getRegionInfo().getRegionName()
+            );
+
+            // Generate splits based off statistics, or just region splits?
+            boolean splitByStats = 
PhoenixConfigurationUtil.getSplitByStats(config);
+
+            if(splitByStats) {
+                for(Scan aScan: scans) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Split for  scan : " + aScan + "with 
scanAttribute : " + aScan
+                                .getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : [" +
+                                aScan.getCaching() + ", " + 
aScan.getCacheBlocks() + ", " + aScan
+                                .getBatch() + "] and  regionLocation : " + 
regionLocation);
+                    }
+
+                    psplits.add(new 
PhoenixInputSplit(Collections.singletonList(aScan), regionSize, 
regionLocation));
+                }
+            }
+            else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Scan count[" + scans.size() + "] : " + 
Bytes.toStringBinary(scans
+                            .get(0).getStartRow()) + " ~ " + 
Bytes.toStringBinary(scans.get(scans
+                            .size() - 1).getStopRow()));
+                    LOG.debug("First scan : " + scans.get(0) + "with 
scanAttribute : " + scans
+                            .get(0).getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : " +
+                            "[" + scans.get(0).getCaching() + ", " + 
scans.get(0).getCacheBlocks()
+                            + ", " + scans.get(0).getBatch() + "] and  
regionLocation : " +
+                            regionLocation);
+
+                    for (int i = 0, limit = scans.size(); i < limit; i++) {
+                        LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + "] : " + 
Bytes
+                                .toStringBinary(scans.get(i).getAttribute
+                                        
(BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
+                    }
+                }
+
+                psplits.add(new PhoenixInputSplit(scans, regionSize, 
regionLocation));
+            }
         }
         return psplits;
     }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/267323da/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
index caee3cd..6d3c5e1 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java
@@ -25,7 +25,6 @@ import java.util.List;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.InputSplit;
@@ -41,6 +40,8 @@ public class PhoenixInputSplit extends InputSplit implements 
Writable {
 
     private List<Scan> scans;
     private KeyRange keyRange;
+    private String regionLocation = null;
+    private long regionSize = 0;
    
     /**
      * No Arg constructor
@@ -53,9 +54,15 @@ public class PhoenixInputSplit extends InputSplit implements 
Writable {
     * @param keyRange
     */
     public PhoenixInputSplit(final List<Scan> scans) {
+        this(scans, 0, null);
+    }
+
+    public PhoenixInputSplit(final List<Scan> scans, long regionSize, String 
regionLocation) {
         Preconditions.checkNotNull(scans);
         Preconditions.checkState(!scans.isEmpty());
         this.scans = scans;
+        this.regionSize = regionSize;
+        this.regionLocation = regionLocation;
         init();
     }
     
@@ -73,6 +80,8 @@ public class PhoenixInputSplit extends InputSplit implements 
Writable {
     
     @Override
     public void readFields(DataInput input) throws IOException {
+        regionLocation = WritableUtils.readString(input);
+        regionSize = WritableUtils.readVLong(input);
         int count = WritableUtils.readVInt(input);
         scans = Lists.newArrayListWithExpectedSize(count);
         for (int i = 0; i < count; i++) {
@@ -87,6 +96,9 @@ public class PhoenixInputSplit extends InputSplit implements 
Writable {
     
     @Override
     public void write(DataOutput output) throws IOException {
+        WritableUtils.writeString(output, regionLocation);
+        WritableUtils.writeVLong(output, regionSize);
+
         Preconditions.checkNotNull(scans);
         WritableUtils.writeVInt(output, scans.size());
         for (Scan scan : scans) {
@@ -99,12 +111,17 @@ public class PhoenixInputSplit extends InputSplit 
implements Writable {
 
     @Override
     public long getLength() throws IOException, InterruptedException {
-         return 0;
+         return regionSize;
     }
 
     @Override
     public String[] getLocations() throws IOException, InterruptedException {
-        return new String[]{};
+        if(regionLocation == null) {
+            return new String[]{};
+        }
+        else {
+            return new String[]{regionLocation};
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/267323da/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
index f3e4450..1d2cbbe 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java
@@ -106,6 +106,11 @@ public final class PhoenixConfigurationUtil {
     
     public static final String DISABLED_INDEXES = 
"phoenix.mr.index.disabledIndexes";
 
+    // Generate splits based on scans from stats, or just from region splits
+    public static final String MAPREDUCE_SPLIT_BY_STATS = 
"phoenix.mapreduce.split.by.stats";
+
+    public static final boolean DEFAULT_SPLIT_BY_STATS = true;
+
     public enum SchemaType {
         TABLE,
         QUERY;
@@ -459,4 +464,10 @@ public final class PhoenixConfigurationUtil {
         Preconditions.checkNotNull(configuration);
         return configuration.get(DISABLED_INDEXES);
     }
+
+    public static boolean getSplitByStats(final Configuration configuration) {
+        Preconditions.checkNotNull(configuration);
+        boolean split = configuration.getBoolean(MAPREDUCE_SPLIT_BY_STATS, 
DEFAULT_SPLIT_BY_STATS);
+        return split;
+    }
 }

Reply via email to