Github user karanmehta93 commented on a diff in the pull request:

    https://github.com/apache/phoenix/pull/309#discussion_r219294186
  
    --- Diff: 
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
 ---
    @@ -157,6 +192,216 @@ public static void setOutput(final Job job, final 
String tableName,final String
             
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns.split(","));
         }
     
    +    /**
    +     * Generate a query plan for a MapReduce job query.
    +     * @param configuration The MapReduce job configuration
    +     * @return Query plan for the MapReduce job
    +     * @throws SQLException If the plan cannot be generated
    +     */
    +    public static QueryPlan getQueryPlan(final Configuration configuration)
    +            throws SQLException {
    +        return getQueryPlan(configuration, false);
    +    }
    +
    +    /**
    +     * Generate a query plan for a MapReduce job query
    +     * @param configuration The MapReduce job configuration
    +     * @param isTargetConnection Whether the query plan is for the target 
HBase cluster
    +     * @return Query plan for the MapReduce job
    +     * @throws SQLException If the plan cannot be generated
    +     */
    +    public static QueryPlan getQueryPlan(final Configuration configuration,
    +            boolean isTargetConnection) throws SQLException {
    +        Preconditions.checkNotNull(configuration);
    +        try {
    +            final String txnScnValue = 
configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
    +            final String currentScnValue = 
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
    +            final String tenantId = 
configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
    +            final Properties overridingProps = new Properties();
    +            if(txnScnValue==null && currentScnValue!=null) {
    +                overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB, 
currentScnValue);
    +            }
    +            if (tenantId != null && 
configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
    +                overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB, 
tenantId);
    +            }
    +
    +            final Connection connection;
    +            final String selectStatement;
    +            if (isTargetConnection) {
    +                String targetTable = 
PhoenixConfigurationUtil.getInputTargetTableName(configuration);
    +                if (!Strings.isNullOrEmpty(targetTable)) {
    +                    // different table on same cluster
    +                    connection = 
ConnectionUtil.getInputConnection(configuration, overridingProps);
    +                    selectStatement =
    +                            
PhoenixConfigurationUtil.getSelectStatement(configuration, true);
    +                } else {
    +                    // same table on different cluster
    +                    connection = 
ConnectionUtil.getTargetInputConnection(configuration, overridingProps);
    +                    selectStatement = 
PhoenixConfigurationUtil.getSelectStatement(configuration);
    +                }
    +            } else {
    +                connection = 
ConnectionUtil.getInputConnection(configuration, overridingProps);
    +                selectStatement = 
PhoenixConfigurationUtil.getSelectStatement(configuration);
    +            }
    +            Preconditions.checkNotNull(selectStatement);
    +            final Statement statement = connection.createStatement();
    +
    +            final PhoenixStatement pstmt = 
statement.unwrap(PhoenixStatement.class);
    +            // Optimize the query plan so that we potentially use 
secondary indexes
    +            final QueryPlan queryPlan = 
pstmt.optimizeQuery(selectStatement);
    +            final Scan scan = queryPlan.getContext().getScan();
    +            // since we can't set a scn on connections with txn set TX_SCN 
attribute so that the
    +            // max time range is set by BaseScannerRegionObserver
    +            if (txnScnValue != null) {
    +                scan.setAttribute(BaseScannerRegionObserver.TX_SCN, 
Bytes.toBytes(Long.valueOf(txnScnValue)));
    +            }
    +
    +            // setting the snapshot configuration
    +            String snapshotName = 
configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
    +            if (snapshotName != null)
    --- End diff --
    
    nit: braces


---

Reply via email to