Github user twdsilva commented on a diff in the pull request:
https://github.com/apache/phoenix/pull/309#discussion_r217209395
--- Diff:
phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java
---
@@ -157,6 +192,216 @@ public static void setOutput(final Job job, final
String tableName,final String
PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns.split(","));
}
+ /**
+ * Generate a query plan for a MapReduce job query.
+ * @param configuration The MapReduce job configuration
+ * @return Query plan for the MapReduce job
+ * @throws SQLException If the plan cannot be generated
+ */
+ public static QueryPlan getQueryPlan(final Configuration configuration)
+ throws SQLException {
+ return getQueryPlan(configuration, false);
+ }
+
+ /**
+ * Generate a query plan for a MapReduce job query
+ * @param configuration The MapReduce job configuration
+ * @param isTargetConnection Whether the query plan is for the target
HBase cluster
+ * @return Query plan for the MapReduce job
+ * @throws SQLException If the plan cannot be generated
+ */
+ public static QueryPlan getQueryPlan(final Configuration configuration,
+ boolean isTargetConnection) throws SQLException {
+ Preconditions.checkNotNull(configuration);
+ try {
+ final String txnScnValue =
configuration.get(PhoenixConfigurationUtil.TX_SCN_VALUE);
+ final String currentScnValue =
configuration.get(PhoenixConfigurationUtil.CURRENT_SCN_VALUE);
+ final String tenantId =
configuration.get(PhoenixConfigurationUtil.MAPREDUCE_TENANT_ID);
+ final Properties overridingProps = new Properties();
+ if(txnScnValue==null && currentScnValue!=null) {
+ overridingProps.put(PhoenixRuntime.CURRENT_SCN_ATTRIB,
currentScnValue);
+ }
+ if (tenantId != null &&
configuration.get(PhoenixRuntime.TENANT_ID_ATTRIB) == null){
+ overridingProps.put(PhoenixRuntime.TENANT_ID_ATTRIB,
tenantId);
+ }
+
+ final Connection connection;
+ final String selectStatement;
+ if (isTargetConnection) {
+ String targetTable =
PhoenixConfigurationUtil.getInputTargetTableName(configuration);
+ if (!Strings.isNullOrEmpty(targetTable)) {
+ // different table on same cluster
+ connection =
ConnectionUtil.getInputConnection(configuration, overridingProps);
+ selectStatement =
+
PhoenixConfigurationUtil.getSelectStatement(configuration, true);
+ } else {
+ // same table on different cluster
+ connection =
ConnectionUtil.getTargetInputConnection(configuration, overridingProps);
+ selectStatement =
PhoenixConfigurationUtil.getSelectStatement(configuration);
+ }
+ } else {
+ connection =
ConnectionUtil.getInputConnection(configuration, overridingProps);
+ selectStatement =
PhoenixConfigurationUtil.getSelectStatement(configuration);
+ }
+ Preconditions.checkNotNull(selectStatement);
+ final Statement statement = connection.createStatement();
+
+ final PhoenixStatement pstmt =
statement.unwrap(PhoenixStatement.class);
+ // Optimize the query plan so that we potentially use
secondary indexes
+ final QueryPlan queryPlan =
pstmt.optimizeQuery(selectStatement);
+ final Scan scan = queryPlan.getContext().getScan();
+ // since we can't set a scn on connections with txn set TX_SCN
attribute so that the
+ // max time range is set by BaseScannerRegionObserver
+ if (txnScnValue != null) {
+ scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
Bytes.toBytes(Long.valueOf(txnScnValue)));
+ }
+
+ // setting the snapshot configuration
+ String snapshotName =
configuration.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+ if (snapshotName != null)
+
PhoenixConfigurationUtil.setSnapshotNameKey(queryPlan.getContext().getConnection().
+ getQueryServices().getConfiguration(),
snapshotName);
+
+ // Initialize the query plan so it sets up the parallel scans
+ queryPlan.iterator(MapReduceParallelScanGrouper.getInstance());
+ return queryPlan;
+ } catch (Exception exception) {
+ LOG.error(String.format("Failed to get the query plan with
error [%s]",
+ exception.getMessage()));
+ throw new RuntimeException(exception);
+ }
+ }
+
+
+
+ /**
+ * Generates the input splits for a MapReduce job.
+ * @param qplan Query plan for the job
+ * @param splits The key range splits for the job
+ * @param config The job configuration
+ * @return Input splits for the job
+ * @throws IOException If the region information for the splits cannot
be retrieved
+ */
+ public static List<InputSplit> generateSplits(final QueryPlan qplan,
+ final List<KeyRange> splits, Configuration config) throws
IOException {
+ Preconditions.checkNotNull(qplan);
+ Preconditions.checkNotNull(splits);
+
+ // Get the RegionSizeCalculator
+ try(org.apache.hadoop.hbase.client.Connection connection =
+
HBaseFactoryProvider.getHConnectionFactory().createConnection(config)) {
+ RegionLocator
+ regionLocator =
+ connection.getRegionLocator(TableName
+
.valueOf(qplan.getTableRef().getTable().getPhysicalName().toString()));
+ RegionSizeCalculator sizeCalculator = new
RegionSizeCalculator(regionLocator, connection.getAdmin());
+
+ final List<InputSplit> psplits =
Lists.newArrayListWithExpectedSize(splits.size());
+ for (List<Scan> scans : qplan.getScans()) {
+ // Get the region location
+ HRegionLocation
+ location =
+
regionLocator.getRegionLocation(scans.get(0).getStartRow(), false);
+
+ String regionLocation = location.getHostname();
+
+ // Get the region size
+ long regionSize =
sizeCalculator.getRegionSize(location.getRegion().getRegionName());
+
+ // Generate splits based off statistics, or just region
splits?
+ boolean splitByStats =
PhoenixConfigurationUtil.getSplitByStats(config);
+
+ if (splitByStats) {
+ for (Scan aScan : scans) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Split for scan : " + aScan + "with
scanAttribute : " + aScan
+ .getAttributesMap() + " [scanCache,
cacheBlock, scanBatch] : ["
+ +
+ aScan.getCaching() + ", " +
aScan.getCacheBlocks() + ", " + aScan.getBatch() + "] and regionLocation : "
+ + regionLocation);
+ }
+
+ psplits.add(
+ new
PhoenixInputSplit(Collections.singletonList(aScan), regionSize,
+ regionLocation));
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Scan count[" + scans.size() + "] : " +
Bytes
+
.toStringBinary(scans.get(0).getStartRow()) + " ~ " + Bytes
+ .toStringBinary(scans.get(scans.size() -
1).getStopRow()));
+ LOG.debug("First scan : " + scans.get(0) + "with
scanAttribute : " + scans
+ .get(0).getAttributesMap() + " [scanCache,
cacheBlock, scanBatch] : " +
+ "[" + scans.get(0).getCaching() + ", " +
scans.get(0).getCacheBlocks()
+ + ", " + scans.get(0).getBatch() + "] and
regionLocation : " +
+ regionLocation);
+
+ for (int i = 0, limit = scans.size(); i < limit;
i++) {
+ LOG.debug("EXPECTED_UPPER_REGION_KEY[" + i +
"] : " + Bytes
+
.toStringBinary(scans.get(i).getAttribute(
+
BaseScannerRegionObserver.EXPECTED_UPPER_REGION_KEY)));
+ }
+ }
+
+ psplits.add(new PhoenixInputSplit(scans, regionSize,
regionLocation));
+ }
+ }
+ return psplits;
+ }
+ }
+
+ public static ResultIterator initializeIterator(Configuration conf,
List<Scan> scans,
+ QueryPlan queryPlan) throws SQLException, IOException {
+ List<PeekingResultIterator> iterators =
Lists.newArrayListWithExpectedSize(scans.size());
+ StatementContext ctx = queryPlan.getContext();
+ ReadMetricQueue readMetrics = ctx.getReadMetricsQueue();
+ String tableName =
queryPlan.getTableRef().getTable().getPhysicalName().getString();
+ String snapshotName =
conf.get(PhoenixConfigurationUtil.SNAPSHOT_NAME_KEY);
+
+ // Clear the table region boundary cache to make sure long running
jobs stay up to date
+ byte[] tableNameBytes =
queryPlan.getTableRef().getTable().getPhysicalName().getBytes();
--- End diff --
I think the code to clear the table region boundary cache was added as part
of PHOENIX-2599. Since the MR/spark/pig integrations does not support order by
which requires rows from a scan to be in sorted order we can skip the region
boundary check in BaseScannerRegionObserver.preScannerOpen and also clear the
region boundary cache.
(see the discussion on PHOENIX-2599).
---