Github user karanmehta93 commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/309#discussion_r219294186
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
---
@@ -157,6 +192,216 @@ public static void setOutput(final Job job, final
String tableName,final String
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns.split(","));
}
+ /**
+ * Generate a query plan for a MapReduce job query.
+ * @param configuration The MapReduce job configuration
+ * @return Query plan for the MapReduce job
+ * @throws SQLException If the plan cannot be generated
+ */
+ public static QueryPlan getQueryPlan(final Configuration configuration)
+ throws SQLException {
+ return getQueryPlan(configuration, false);
+ }
+
+ /**
+ * Generate a query plan for a MapReduce job query
+ * @param configuration The MapReduce job configuration
+ * @param isTargetConnection Whether the query plan is for the target
HBase cluster
+ * @return Query plan for the MapReduce job
+ * @throws SQLException If the plan cannot be generated
+ */
+ public static QueryPlan getQueryPlan(final Configuration configuration,
+ boolean isTargetConnection) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ try {
+ final String txnScnValue =
configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+ final String currentScnValue =
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ final String tenantId =
configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+ final Properties overridingProps = new Properties();
+ if(txnScnValue==null && currentScnValue!=null) {
+ overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB,
currentScnValue);
+ }
+ if (tenantId != null &&
configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
+ overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB,
tenantId);
+ }
+
+ final Connection connection;
+ final String selectStatement;
+ if (isTargetConnection) {
+ String targetTable =
PhoenixConfigurationUtil.getInputTargetTableName(configuration);
+ if (!Strings.isNullOrEmpty(targetTable)) {
+ // different table on same cluster
+ connection =
ConnectionUtil.getInputConnection(configuration, overridingProps);
+ selectStatement =
+
PhoenixConfigurationUtil.getSelectStatement(configuration, true);
+ } else {
+ // same table on different cluster
+ connection =
ConnectionUtil.getTargetInputConnection(configuration, overridingProps);
+ selectStatement =
PhoenixConfigurationUtil.getSelectStatement(configuration);
+ }
+ } else {
+ connection =
ConnectionUtil.getInputConnection(configuration, overridingProps);
+ selectStatement =
PhoenixConfigurationUtil.getSelectStatement(configuration);
+ }
+ Preconditions.checkNotNull(selectStatement);
+ final Statement statement = connection.createStatement();
+
+ final PhoenixStatement pstmt =
statement.unwrap(PhoenixStatement.class);
+ // Optimize the query plan so that we potentially use
secondary indexes
+ final QueryPlan queryPlan =
pstmt.optimizeQuery(selectStatement);
+ final Scan scan = queryPlan.getContext().getScan();
+ // since we can't set a scn on connections with txn set TX_SCN
attribute so that the
+ // max time range is set by BaseScannerRegionObserver
+ if (txnScnValue != null) {
+ scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
Bytes.toBytes(Long.valueOf(txnScnValue)));
+ }
+
+ // setting the snapshot configuration
+ String snapshotName =
configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+ if (snapshotName != null)
--- End diff --
nit: braces
---