Github user twdsilva commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/309#discussion_r217209395
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
 ---
    @@ -157,6 +192,216 @@ public static void setOutput(final Job job, final 
String tableName,final String
             
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns.split(","));
         }
     
    +    /**
    +     * Generate a query plan for a MapReduce job query.
    +     * @param configuration The MapReduce job configuration
    +     * @return Query plan for the MapReduce job
    +     * @throws SQLException If the plan cannot be generated
    +     */
    +    public static QueryPlan getQueryPlan(final Configuration configuration)
    +            throws SQLException {
    +        return getQueryPlan(configuration, false);
    +    }
    +
    +    /**
    +     * Generate a query plan for a MapReduce job query
    +     * @param configuration The MapReduce job configuration
    +     * @param isTargetConnection Whether the query plan is for the target 
HBase cluster
    +     * @return Query plan for the MapReduce job
    +     * @throws SQLException If the plan cannot be generated
    +     */
    +    public static QueryPlan getQueryPlan(final Configuration configuration,
    +            boolean isTargetConnection) throws SQLException {
    +        Preconditions.checkNotNull(configuration);
    +        try {
    +            final String txnScnValue = 
configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
    +            final String currentScnValue = 
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
    +            final String tenantId = 
configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
    +            final Properties overridingProps = new Properties();
    +            if(txnScnValue==null && currentScnValue!=null) {
    +                overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
currentScnValue);
    +            }
    +            if (tenantId != null && 
configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
    +                overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, 
tenantId);
    +            }
    +
    +            final Connection connection;
    +            final String selectStatement;
    +            if (isTargetConnection) {
    +                String targetTable = 
PhoenixConfigurationUtil.getInputTargetTableName(configuration);
    +                if (!Strings.isNullOrEmpty(targetTable)) {
    +                    // different table on same cluster
    +                    connection = 
ConnectionUtil.getInputConnection(configuration, overridingProps);
    +                    selectStatement =
    +                            
PhoenixConfigurationUtil.getSelectStatement(configuration, true);
    +                } else {
    +                    // same table on different cluster
    +                    connection = 
ConnectionUtil.getTargetInputConnection(configuration, overridingProps);
    +                    selectStatement = 
PhoenixConfigurationUtil.getSelectStatement(configuration);
    +                }
    +            } else {
    +                connection = 
ConnectionUtil.getInputConnection(configuration, overridingProps);
    +                selectStatement = 
PhoenixConfigurationUtil.getSelectStatement(configuration);
    +            }
    +            Preconditions.checkNotNull(selectStatement);
    +            final Statement statement = connection.createStatement();
    +
    +            final PhoenixStatement pstmt = 
statement.unwrap(PhoenixStatement.class);
    +            // Optimize the query plan so that we potentially use 
secondary indexes
    +            final QueryPlan queryPlan = 
pstmt.optimizeQuery(selectStatement);
    +            final Scan scan = queryPlan.getContext().getScan();
    +            // since we can't set a scn on connections with txn set TX_SCN 
attribute so that the
    +            // max time range is set by BaseScannerRegionObserver
    +            if (txnScnValue != null) {
    +                scan.setAttribute(BaseScannerRegionObserver.TX_SCN, 
Bytes.toBytes(Long.valueOf(txnScnValue)));
    +            }
    +
    +            // setting the snapshot configuration
    +            String snapshotName = 
configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
    +            if (snapshotName != null)
    +                
PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
    +                        getQueryServices().getConfiguration(), 
snapshotName);
    +
    +            // Initialize the query plan so it sets up the parallel scans
    +            queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
    +            return queryPlan;
    +        } catch (Exception exception) {
    +            LOG.error(String.format("Failed to get the query plan with 
error [%s]",
    +                    exception.getMessage()));
    +            throw new RuntimeException(exception);
    +        }
    +    }
    +
    +
    +
    +    /**
    +     * Generates the input splits for a MapReduce job.
    +     * @param qplan Query plan for the job
    +     * @param splits The key range splits for the job
    +     * @param config The job configuration
    +     * @return Input splits for the job
    +     * @throws IOException If the region information for the splits cannot 
be retrieved
    +     */
    +    public static List<InputSplit> generateSplits(final QueryPlan qplan,
    +            final List<KeyRange> splits, Configuration config) throws 
IOException {
    +        Preconditions.checkNotNull(qplan);
    +        Preconditions.checkNotNull(splits);
    +
    +        // Get the RegionSizeCalculator
    +        try(org.apache.hadoop.hbase.client.Connection connection =
    +                
HBaseFactoryProvider.getHConnectionFactory().createConnection(config)) {
    +            RegionLocator
    +                    regionLocator =
    +                    connection.getRegionLocator(TableName
    +                            
.valueOf(qplan.getTableRef().getTable().getPhysicalName().toString()));
    +            RegionSizeCalculator sizeCalculator = new 
RegionSizeCalculator(regionLocator, connection.getAdmin());
    +
    +            final List<InputSplit> psplits = 
Lists.newArrayListWithExpectedSize(splits.size());
    +            for (List<Scan> scans : qplan.getScans()) {
    +                // Get the region location
    +                HRegionLocation
    +                        location =
    +                        
regionLocator.getRegionLocation(scans.get(0).getStartRow(), false);
    +
    +                String regionLocation = location.getHostname();
    +
    +                // Get the region size
    +                long regionSize = 
sizeCalculator.getRegionSize(location.getRegion().getRegionName());
    +
    +                // Generate splits based off statistics, or just region 
splits?
    +                boolean splitByStats = 
PhoenixConfigurationUtil.getSplitByStats(config);
    +
    +                if (splitByStats) {
    +                    for (Scan aScan : scans) {
    +                        if (LOG.isDebugEnabled()) {
    +                            LOG.debug("Split for  scan : " + aScan + "with 
scanAttribute : " + aScan
    +                                    .getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : ["
    +                                    +
    +                                    aScan.getCaching() + ", " + 
aScan.getCacheBlocks() + ", " + aScan.getBatch() + "] and  regionLocation : "
    +                                    + regionLocation);
    +                        }
    +
    +                        psplits.add(
    +                                new 
PhoenixInputSplit(Collections.singletonList(aScan), regionSize,
    +                                        regionLocation));
    +                    }
    +                } else {
    +                    if (LOG.isDebugEnabled()) {
    +                        LOG.debug("Scan count[" + scans.size() + "] : " + 
Bytes
    +                                
.toStringBinary(scans.get(0).getStartRow()) + " ~ " + Bytes
    +                                .toStringBinary(scans.get(scans.size() - 
1).getStopRow()));
    +                        LOG.debug("First scan : " + scans.get(0) + "with 
scanAttribute : " + scans
    +                                .get(0).getAttributesMap() + " [scanCache, 
cacheBlock, scanBatch] : " +
    +                                "[" + scans.get(0).getCaching() + ", " + 
scans.get(0).getCacheBlocks()
    +                                + ", " + scans.get(0).getBatch() + "] and  
regionLocation : " +
    +                                regionLocation);
    +
    +                        for (int i = 0, limit = scans.size(); i < limit; 
i++) {
    +                            LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i + 
"] : " + Bytes
    +                                    
.toStringBinary(scans.get(i).getAttribute(
    +                                            
BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
    +                        }
    +                    }
    +
    +                    psplits.add(new PhoenixInputSplit(scans, regionSize, 
regionLocation));
    +                }
    +            }
    +            return psplits;
    +        }
    +    }
    +
    +    public static ResultIterator initializeIterator(Configuration conf, 
List<Scan> scans,
    +            QueryPlan queryPlan) throws SQLException, IOException {
    +        List<PeekingResultIterator> iterators = 
Lists.newArrayListWithExpectedSize(scans.size());
    +        StatementContext ctx = queryPlan.getContext();
    +        ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
    +        String tableName = 
queryPlan.getTableRef().getTable().getPhysicalName().getString();
    +        String snapshotName = 
conf.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
    +
    +        // Clear the table region boundary cache to make sure long running 
jobs stay up to date
    +        byte[] tableNameBytes = 
queryPlan.getTableRef().getTable().getPhysicalName().getBytes();
    --- End diff --
    
    I think the code to clear the table region boundary cache was added as part 
of PHOENIX-2599. Since the MR/spark/pig integrations does not support order by 
which requires rows from a scan to be in sorted order we can skip the region 
boundary check in BaseScannerRegionObserver.preScannerOpen and also clear the 
region boundary cache. 
    (see the discussion on PHOENIX-2599). 


---

Reply via email to