Repository: phoenix Updated Branches: refs/heads/master 051e40843 -> cd8c6b877
http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index cb072ed..1aac3c5 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -153,6 +153,7 @@ public class TestUtil { public static final String ATABLE_SCHEMA_NAME = ""; public static final String BTABLE_NAME = "BTABLE"; public static final String STABLE_NAME = "STABLE"; + public static final String STABLE_PK_NAME = "ID"; public static final String STABLE_SCHEMA_NAME = ""; public static final String GROUPBYTEST_NAME = "GROUPBYTEST"; public static final String CUSTOM_ENTITY_DATA_FULL_NAME = "CORE.CUSTOM_ENTITY_DATA"; @@ -451,4 +452,44 @@ public class TestUtil { upsertRow(conn, "DESC", i, inputList.get(i)); } } + + public static List<KeyRange> getAllSplits(Connection conn, String tableName) throws SQLException { + return getSplits(conn, tableName, null, null, null, null); + } + + public static List<KeyRange> getAllSplits(Connection conn, String tableName, String where) throws SQLException { + return getSplits(conn, tableName, null, null, null, where); + } + + public static List<KeyRange> getSplits(Connection conn, String tableName, String pkCol, byte[] lowerRange, byte[] upperRange, String whereClauseSuffix) throws SQLException { + String whereClauseStart = + (lowerRange == null && upperRange == null ? "" : + " WHERE " + ((lowerRange != null ? (pkCol + " >= ? " + (upperRange != null ? " AND " : "")) : "") + + (upperRange != null ? (pkCol + " < ?") : "" ))); + String whereClause = whereClauseSuffix == null ? whereClauseStart : whereClauseStart.length() == 0 ? (" WHERE " + whereClauseSuffix) : (" AND " + whereClauseSuffix); + String query = "SELECT COUNT(*) FROM " + tableName + whereClause; + PhoenixPreparedStatement pstmt = conn.prepareStatement(query).unwrap(PhoenixPreparedStatement.class); + if (lowerRange != null) { + pstmt.setBytes(1, lowerRange); + } + if (upperRange != null) { + pstmt.setBytes(lowerRange != null ? 2 : 1, upperRange); + } + pstmt.execute(); + List<KeyRange> keyRanges = pstmt.getQueryPlan().getSplits(); + return keyRanges; + } + + public static List<KeyRange> getSplits(Connection conn, byte[] lowerRange, byte[] upperRange) throws SQLException { + return getSplits(conn, STABLE_NAME, STABLE_PK_NAME, lowerRange, upperRange, null); + } + + public static List<KeyRange> getAllSplits(Connection conn) throws SQLException { + return getAllSplits(conn, STABLE_NAME); + } + + public static void analyzeTable(Connection conn, String tableName) throws IOException, SQLException { + String query = "ANALYZE " + tableName; + conn.createStatement().execute(query); + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/d018cc1c/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java index 4326876..bd1df97 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java @@ -28,7 +28,6 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; @@ -36,13 +35,9 @@ import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.jdbc.PhoenixStatement; import org.apache.phoenix.pig.PhoenixPigConfiguration; import org.apache.phoenix.query.KeyRange; -import org.apache.phoenix.schema.SaltingUtil; -import org.apache.phoenix.schema.TableRef; -import org.apache.phoenix.util.ScanUtil; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; @@ -91,22 +86,8 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR Preconditions.checkNotNull(qplan); Preconditions.checkNotNull(splits); final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size()); - final StatementContext context = qplan.getContext(); - final TableRef tableRef = qplan.getTableRef(); - for (KeyRange split : splits) { - final Scan splitScan = new Scan(context.getScan()); - if (tableRef.getTable().getBucketNum() != null) { - KeyRange minMaxRange = context.getMinMaxRange(); - if (minMaxRange != null) { - minMaxRange = SaltingUtil.addSaltByte(split.getLowerRange(), minMaxRange); - split = split.intersect(minMaxRange); - } - } - // as the intersect code sets the actual start and stop row within the passed splitScan, we are fetching it back below. - if (ScanUtil.intersectScanRange(splitScan, split.getLowerRange(), split.getUpperRange(), context.getScanRanges().useSkipScanFilter())) { - final PhoenixInputSplit inputSplit = new PhoenixInputSplit(KeyRange.getKeyRange(splitScan.getStartRow(), splitScan.getStopRow())); - psplits.add(inputSplit); - } + for (KeyRange split : qplan.getSplits()) { + psplits.add(new PhoenixInputSplit(split)); } return psplits; } @@ -147,6 +128,8 @@ public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixR final Statement statement = connection.createStatement(); final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); this.queryPlan = pstmt.compileQuery(selectStatement); + // FIXME: why is getting the iterator necessary here, as it will + // cause the query to run. this.queryPlan.iterator(); } catch(Exception exception) { LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage()));
