PHOENIX-1763 Support building with HBase-1.1.0
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/98271b88 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/98271b88 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/98271b88 Branch: refs/heads/4.x-HBase-1.1 Commit: 98271b888c113f10e174205434e05d3b36b7eb67 Parents: bf01eb2 Author: Enis Soztutar <e...@apache.org> Authored: Thu May 21 23:08:26 2015 -0700 Committer: Enis Soztutar <e...@apache.org> Committed: Fri May 22 00:30:56 2015 -0700 ---------------------------------------------------------------------- phoenix-core/pom.xml | 17 +++-- .../regionserver/IndexHalfStoreFileReader.java | 31 ++++++-- .../regionserver/IndexSplitTransaction.java | 39 ++++++++-- .../hbase/regionserver/LocalIndexMerger.java | 3 +- .../cache/aggcache/SpillableGroupByCache.java | 13 +++- .../phoenix/coprocessor/BaseRegionScanner.java | 12 +-- .../coprocessor/BaseScannerRegionObserver.java | 77 +++++++++++--------- .../coprocessor/DelegateRegionScanner.java | 23 ++++-- .../GroupedAggregateRegionObserver.java | 53 ++++++++------ .../coprocessor/HashJoinRegionScanner.java | 60 ++++++++------- .../coprocessor/MetaDataRegionObserver.java | 23 +++--- .../phoenix/coprocessor/ScanRegionObserver.java | 11 ++- .../UngroupedAggregateRegionObserver.java | 55 +++++++------- .../hbase/index/covered/data/LocalTable.java | 2 +- .../index/covered/filter/FamilyOnlyFilter.java | 6 +- .../index/scanner/FilteredKeyValueScanner.java | 2 +- .../phoenix/index/PhoenixIndexBuilder.java | 6 +- .../iterate/RegionScannerResultIterator.java | 9 ++- .../phoenix/schema/stats/StatisticsScanner.java | 10 ++- .../hbase/ipc/PhoenixIndexRpcSchedulerTest.java | 6 +- .../index/covered/TestLocalTableState.java | 1 - .../covered/filter/TestFamilyOnlyFilter.java | 12 +-- .../index/write/TestWALRecoveryCaching.java | 4 +- phoenix-flume/pom.xml | 9 --- phoenix-pig/pom.xml | 31 +++++--- phoenix-spark/pom.xml | 7 ++ pom.xml | 41 ++++++++++- 27 files changed, 361 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml index 45b8d73..22e6b60 100644 --- a/phoenix-core/pom.xml +++ b/phoenix-core/pom.xml @@ -350,16 +350,25 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-it</artifactId> - <version>${hbase.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> + <artifactId>hbase-annotations</artifactId> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> </dependency> <dependency> <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <scope>test</scope> + <type>test-jar</type> + </dependency> + <dependency> + <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> </dependency> <dependency> @@ -369,18 +378,16 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> - <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-server</artifactId> - <version>${hbase.version}</version> <type>test-jar</type> + <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop-compat</artifactId> - <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> @@ -391,13 +398,11 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> - <version>${hbase.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> - <version>${hbase.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java index 49e2022..9befc8c 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexHalfStoreFileReader.java @@ -47,11 +47,11 @@ import org.apache.phoenix.index.IndexMaintainer; * that sort lowest and 'top' is the second half of the file with keys that sort greater than those * of the bottom half. The top includes the split files midkey, of the key that follows if it does * not exist in the file. - * + * * <p> * This type works in tandem with the {@link Reference} type. This class is used reading while * Reference is used writing. - * + * * <p> * This file is not splitable. Calls to {@link #midkey()} return null. */ @@ -64,7 +64,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { private final byte[] splitkey; private final byte[] splitRow; private final Map<ImmutableBytesWritable, IndexMaintainer> indexMaintainers; - private final byte[][] viewConstants; + private final byte[][] viewConstants; private final int offset; private final HRegionInfo regionInfo; private final byte[] regionStartKeyInHFile; @@ -144,6 +144,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { final HFileScanner delegate = s; public boolean atEnd = false; + @Override public ByteBuffer getKey() { if (atEnd) { return null; @@ -160,7 +161,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { // If it is top store file replace the StartKey of the Key with SplitKey return getChangedKey(delegate.getKeyValue(), changeBottomKeys); } - + private ByteBuffer getChangedKey(Cell kv, boolean changeBottomKeys) { // new KeyValue(row, family, qualifier, timestamp, type, value) byte[] newRowkey = getNewRowkeyByRegionStartKeyReplacedWithSplitKey(kv, changeBottomKeys); @@ -183,6 +184,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { return keyReplacedStartKey; } + @Override public String getKeyString() { if (atEnd) { return null; @@ -190,6 +192,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { return Bytes.toStringBinary(getKey()); } + @Override public ByteBuffer getValue() { if (atEnd) { return null; @@ -197,6 +200,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { return delegate.getValue(); } + @Override public String getValueString() { if (atEnd) { return null; @@ -204,6 +208,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { return Bytes.toStringBinary(getValue()); } + @Override public Cell getKeyValue() { if (atEnd) { return null; @@ -227,6 +232,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { return changedKv; } + @Override public boolean next() throws IOException { if (atEnd) { return false; @@ -248,10 +254,12 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { } } + @Override public boolean seekBefore(byte[] key) throws IOException { return seekBefore(key, 0, key.length); } + @Override public boolean seekBefore(byte[] key, int offset, int length) throws IOException { if (top) { @@ -282,6 +290,7 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { return seekBefore(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()); } + @Override public boolean seekTo() throws IOException { boolean b = delegate.seekTo(); if (!b) { @@ -302,10 +311,12 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { } } + @Override public int seekTo(byte[] key) throws IOException { return seekTo(key, 0, key.length); } + @Override public int seekTo(byte[] key, int offset, int length) throws IOException { if (top) { if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) { @@ -342,10 +353,12 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { return seekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()); } + @Override public int reseekTo(byte[] key) throws IOException { return reseekTo(key, 0, key.length); } + @Override public int reseekTo(byte[] key, int offset, int length) throws IOException { if (top) { if (getComparator().compare(key, offset, length, splitkey, 0, splitkey.length) < 0) { @@ -375,11 +388,13 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { return reseekTo(kv.getBuffer(), kv.getKeyOffset(), kv.getKeyLength()); } + @Override public org.apache.hadoop.hbase.io.hfile.HFile.Reader getReader() { return this.delegate.getReader(); } // TODO: Need to change as per IndexHalfStoreFileReader + @Override public boolean isSeeked() { return this.delegate.isSeeked(); } @@ -425,13 +440,13 @@ public class IndexHalfStoreFileReader extends StoreFile.Reader { /** * In case of top half store, the passed key will be with the start key of the daughter region. * But in the actual HFiles, the key will be with the start key of the old parent region. In - * order to make the real seek in the HFiles, we need to build the old key. - * + * order to make the real seek in the HFiles, we need to build the old key. + * * The logic here is just replace daughter region start key with parent region start key * in the key part. - * + * * @param key - * + * */ private KeyValue getKeyPresentInHFiles(byte[] key) { KeyValue keyValue = new KeyValue(key); http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java index 920380b..3057a14 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/IndexSplitTransaction.java @@ -165,6 +165,7 @@ public class IndexSplitTransaction extends SplitTransaction { * @return <code>true</code> if the region is splittable else * <code>false</code> if it is not (e.g. its already closed, etc.). */ + @Override public boolean prepare() { if (!this.parent.isSplittable()) return false; // Split key can be null if this region is unsplittable; i.e. has refs. @@ -215,6 +216,7 @@ public class IndexSplitTransaction extends SplitTransaction { * Call {@link #rollback(Server, RegionServerServices)} * @return Regions created */ + @Override /* package */PairOfSameType<HRegion> createDaughters(final Server server, final RegionServerServices services) throws IOException { LOG.info("Starting split of region " + this.parent); @@ -288,16 +290,19 @@ public class IndexSplitTransaction extends SplitTransaction { if (metaEntries == null || metaEntries.isEmpty()) { MetaTableAccessor.splitRegion(server.getConnection(), parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), - daughterRegions.getSecond().getRegionInfo(), server.getServerName()); + daughterRegions.getSecond().getRegionInfo(), server.getServerName(), + parent.getTableDesc().getRegionReplication()); } else { offlineParentInMetaAndputMetaEntries(server.getConnection(), parent.getRegionInfo(), daughterRegions.getFirst().getRegionInfo(), daughterRegions - .getSecond().getRegionInfo(), server.getServerName(), metaEntries); + .getSecond().getRegionInfo(), server.getServerName(), metaEntries, + parent.getTableDesc().getRegionReplication()); } } return daughterRegions; } + @Override public PairOfSameType<HRegion> stepsBeforePONR(final Server server, final RegionServerServices services, boolean testing) throws IOException { // Set ephemeral SPLITTING znode up in zk. Mocked servers sometimes don't @@ -380,6 +385,7 @@ public class IndexSplitTransaction extends SplitTransaction { * @throws IOException If thrown, transaction failed. * Call {@link #rollback(Server, RegionServerServices)} */ + @Override /* package */void openDaughters(final Server server, final RegionServerServices services, HRegion a, HRegion b) throws IOException { @@ -565,6 +571,7 @@ public class IndexSplitTransaction extends SplitTransaction { * @throws IOException * @see #rollback(Server, RegionServerServices) */ + @Override public PairOfSameType<HRegion> execute(final Server server, final RegionServerServices services) throws IOException { @@ -575,6 +582,7 @@ public class IndexSplitTransaction extends SplitTransaction { return stepsAfterPONR(server, services, regions); } + @Override public PairOfSameType<HRegion> stepsAfterPONR(final Server server, final RegionServerServices services, PairOfSameType<HRegion> regions) throws IOException { @@ -585,7 +593,7 @@ public class IndexSplitTransaction extends SplitTransaction { private void offlineParentInMetaAndputMetaEntries(Connection conn, HRegionInfo parent, HRegionInfo splitA, HRegionInfo splitB, - ServerName serverName, List<Mutation> metaEntries) throws IOException { + ServerName serverName, List<Mutation> metaEntries, int regionReplication) throws IOException { List<Mutation> mutations = metaEntries; HRegionInfo copyOfParent = new HRegionInfo(parent); copyOfParent.setOffline(true); @@ -595,7 +603,7 @@ public class IndexSplitTransaction extends SplitTransaction { Put putParent = MetaTableAccessor.makePutFromRegionInfo(copyOfParent); MetaTableAccessor.addDaughtersToPut(putParent, splitA, splitB); mutations.add(putParent); - + //Puts for daughters Put putA = MetaTableAccessor.makePutFromRegionInfo(splitA); Put putB = MetaTableAccessor.makePutFromRegionInfo(splitB); @@ -604,9 +612,18 @@ public class IndexSplitTransaction extends SplitTransaction { addLocation(putB, serverName, 1); mutations.add(putA); mutations.add(putB); + + // Add empty locations for region replicas of daughters so that number of replicas can be + // cached whenever the primary region is looked up from meta + for (int i = 1; i < regionReplication; i++) { + addEmptyLocation(putA, i); + addEmptyLocation(putB, i); + } + MetaTableAccessor.mutateMetaTable(conn, mutations); } + @Override public Put addLocation(final Put p, final ServerName sn, long openSeqNum) { p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes.toBytes(sn.getHostAndPort())); @@ -617,6 +634,13 @@ public class IndexSplitTransaction extends SplitTransaction { return p; } + private static Put addEmptyLocation(final Put p, int replicaId){ + p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getServerColumn(replicaId), null); + p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getStartCodeColumn(replicaId), null); + p.addImmutable(HConstants.CATALOG_FAMILY, MetaTableAccessor.getSeqNumColumn(replicaId), null); + return p; + } + /* * Open daughter region in its own thread. * If we fail, abort this hosting server. @@ -659,6 +683,7 @@ public class IndexSplitTransaction extends SplitTransaction { * @throws IOException * @throws KeeperException */ + @Override void openDaughterRegion(final Server server, final HRegion daughter) throws IOException, KeeperException { HRegionInfo hri = daughter.getRegionInfo(); @@ -767,6 +792,7 @@ public class IndexSplitTransaction extends SplitTransaction { this.family = family; } + @Override public Void call() throws IOException { splitStoreFile(family, sf); return null; @@ -807,6 +833,7 @@ public class IndexSplitTransaction extends SplitTransaction { * @return True if we successfully rolled back, false if we got to the point * of no return and so now need to abort the server to minimize damage. */ + @Override @SuppressWarnings("deprecation") public boolean rollback(final Server server, final RegionServerServices services) throws IOException { @@ -879,10 +906,12 @@ public class IndexSplitTransaction extends SplitTransaction { return result; } + @Override HRegionInfo getFirstDaughter() { return hri_a; } + @Override HRegionInfo getSecondDaughter() { return hri_b; } @@ -971,7 +1000,7 @@ public class IndexSplitTransaction extends SplitTransaction { return ZKAssign.transitionNode(zkw, parent, serverName, beginState, endState, znodeVersion, payload); } - + public HRegion getParent() { return this.parent; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java index f074df7..add9b72 100644 --- a/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java +++ b/phoenix-core/src/main/java/org/apache/hadoop/hbase/regionserver/LocalIndexMerger.java @@ -81,7 +81,8 @@ public class LocalIndexMerger extends BaseRegionServerObserver { this.mergedRegion = rmt.stepsBeforePONR(rss, rss, false); rmt.prepareMutationsForMerge(mergedRegion.getRegionInfo(), indexRegionA.getRegionInfo(), indexRegionB.getRegionInfo(), - rss.getServerName(), metaEntries); + rss.getServerName(), metaEntries, + mergedRegion.getTableDesc().getRegionReplication()); } catch (Exception e) { ctx.bypass(); LOG.warn("index regions merge failed with the exception ", e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java index ce18cc2..69fc6f6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/cache/aggcache/SpillableGroupByCache.java @@ -120,7 +120,7 @@ public class SpillableGroupByCache implements GroupByCache { /** * Instantiates a Loading LRU Cache that stores key / aggregator[] tuples used for group by queries - * + * * @param estSize * @param estValueSize * @param aggs @@ -325,7 +325,7 @@ public class SpillableGroupByCache implements GroupByCache { /** * Closes cache and releases spill resources - * + * * @throws IOException */ @Override @@ -358,7 +358,9 @@ public class SpillableGroupByCache implements GroupByCache { @Override public boolean next(List<Cell> results) throws IOException { - if (!cacheIter.hasNext()) { return false; } + if (!cacheIter.hasNext()) { + return false; + } Map.Entry<ImmutableBytesWritable, Aggregator[]> ce = cacheIter.next(); ImmutableBytesWritable key = ce.getKey(); Aggregator[] aggs = ce.getValue(); @@ -377,6 +379,11 @@ public class SpillableGroupByCache implements GroupByCache { public long getMaxResultSize() { return s.getMaxResultSize(); } + + @Override + public int getBatch() { + return s.getBatch(); + } }; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java index ff9ac76..828f776 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseRegionScanner.java @@ -22,14 +22,14 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.DoNotRetryIOException; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; public abstract class BaseRegionScanner implements RegionScanner { @Override public boolean isFilterDone() { - return false; + return false; } @Override @@ -38,10 +38,10 @@ public abstract class BaseRegionScanner implements RegionScanner { } @Override - public boolean next(List<Cell> result, int limit) throws IOException { + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { return next(result); } - + @Override public boolean reseek(byte[] row) throws IOException { throw new DoNotRetryIOException("Unsupported"); @@ -58,7 +58,7 @@ public abstract class BaseRegionScanner implements RegionScanner { } @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - return next(result, limit); + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { + return next(result, scannerContext); } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index a2269b4..fc74968 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.htrace.Span; import org.apache.htrace.Trace; @@ -60,7 +61,7 @@ import com.google.common.collect.ImmutableList; abstract public class BaseScannerRegionObserver extends BaseRegionObserver { - + public static final String AGGREGATORS = "_Aggs"; public static final String UNORDERED_GROUP_BY_EXPRESSIONS = "_UnorderedGroupByExpressions"; public static final String KEY_ORDERED_GROUP_BY_EXPRESSIONS = "_OrderedGroupByExpressions"; @@ -91,7 +92,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { * Attribute name used to pass custom annotations in Scans and Mutations (later). Custom annotations * are used to augment log lines emitted by Phoenix. See https://issues.apache.org/jira/browse/PHOENIX-1198. */ - public static final String CUSTOM_ANNOTATIONS = "_Annot"; + public static final String CUSTOM_ANNOTATIONS = "_Annot"; /** Exposed for testing */ public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server"; @@ -111,8 +112,8 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public String toString() { return this.getClass().getName(); } - - + + private static void throwIfScanOutOfRegion(Scan scan, HRegion region) throws DoNotRetryIOException { boolean isLocalIndex = ScanUtil.isLocalIndex(scan); byte[] lowerInclusiveScanKey = scan.getStartRow(); @@ -136,7 +137,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { abstract protected boolean isRegionObserverFor(Scan scan); abstract protected RegionScanner doPostScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable; - + @Override public RegionScanner preScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { @@ -153,7 +154,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { /** * Wrapper for {@link #postScannerOpen(ObserverContext, Scan, RegionScanner)} that ensures no non IOException is thrown, * to prevent the coprocessor from becoming blacklisted. - * + * */ @Override public final RegionScanner postScannerOpen( @@ -165,10 +166,10 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { } boolean success =false; // Save the current span. When done with the child span, reset the span back to - // what it was. Otherwise, this causes the thread local storing the current span + // what it was. Otherwise, this causes the thread local storing the current span // to not be reset back to null causing catastrophic infinite loops // and region servers to crash. See https://issues.apache.org/jira/browse/PHOENIX-1596 - // TraceScope can't be used here because closing the scope will end up calling + // TraceScope can't be used here because closing the scope will end up calling // currentSpan.stop() and that should happen only when we are closing the scanner. final Span savedSpan = Trace.currentSpan(); final Span child = Trace.startSpan(SCANNER_OPENED_TRACE_INFO, savedSpan).getSpan(); @@ -226,7 +227,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { return getWrappedScanner(c, s, null, null, offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainer, viewConstants, null, null, projector, ptr); } - + /** * Return wrapped scanner that catches unexpected exceptions (i.e. Phoenix bugs) and * re-throws as DoNotRetryIOException to prevent needless retrying hanging the query @@ -246,7 +247,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { final Expression[] arrayFuncRefs, final int offset, final Scan scan, final ColumnReference[] dataColumns, final TupleProjector tupleProjector, final HRegion dataRegion, final IndexMaintainer indexMaintainer, - final byte[][] viewConstants, final KeyValueSchema kvSchema, + final byte[][] viewConstants, final KeyValueSchema kvSchema, final ValueBitSet kvSchemaBitSet, final TupleProjector projector, final ImmutableBytesWritable ptr) { return new RegionScanner() { @@ -262,9 +263,9 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { } @Override - public boolean next(List<Cell> result, int limit) throws IOException { + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { try { - return s.next(result, limit); + return s.next(result, scannerContext); } catch (Throwable t) { ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); return false; // impossible @@ -324,30 +325,31 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { } @Override - public boolean nextRaw(List<Cell> result, int limit) throws IOException { - try { - boolean next = s.nextRaw(result, limit); - if (result.size() == 0) { - return next; - } - if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) { - replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result); - } - if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) { - IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns, - tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); - } - if (projector != null) { - Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result))); - result.clear(); - result.add(tuple.getValue(0)); - } - // There is a scanattribute set to retrieve the specific array element + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) + throws IOException { + try { + boolean next = s.nextRaw(result, scannerContext); + if (result.size() == 0) { return next; - } catch (Throwable t) { - ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); - return false; // impossible } + if (arrayFuncRefs != null && arrayFuncRefs.length > 0 && arrayKVRefs.size() > 0) { + replaceArrayIndexElement(arrayKVRefs, arrayFuncRefs, result); + } + if ((offset > 0 || ScanUtil.isLocalIndex(scan)) && !ScanUtil.isAnalyzeTable(scan)) { + IndexUtil.wrapResultUsingOffset(c, result, offset, dataColumns, + tupleProjector, dataRegion, indexMaintainer, viewConstants, ptr); + } + if (projector != null) { + Tuple tuple = projector.projectResults(new ResultTuple(Result.create(result))); + result.clear(); + result.add(tuple.getValue(0)); + } + // There is a scanattribute set to retrieve the specific array element + return next; + } catch (Throwable t) { + ServerUtil.throwIOException(c.getEnvironment().getRegion().getRegionNameAsString(), t); + return false; // impossible + } } private void replaceArrayIndexElement(final Set<KeyValueColumnExpression> arrayKVRefs, @@ -387,6 +389,11 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { public long getMaxResultSize() { return s.getMaxResultSize(); } + + @Override + public int getBatch() { + return s.getBatch(); + } }; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index f88a931..43c35a8 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -22,6 +22,7 @@ import java.util.List; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; public class DelegateRegionScanner implements RegionScanner { @@ -56,23 +57,33 @@ public class DelegateRegionScanner implements RegionScanner { delegate.close(); } + @Override public long getMaxResultSize() { return delegate.getMaxResultSize(); } - public boolean next(List<Cell> arg0, int arg1) throws IOException { - return delegate.next(arg0, arg1); + @Override + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + return delegate.next(result, scannerContext); } - public boolean next(List<Cell> arg0) throws IOException { - return delegate.next(arg0); + @Override + public boolean next(List<Cell> result) throws IOException { + return delegate.next(result); } - public boolean nextRaw(List<Cell> arg0, int arg1) throws IOException { - return delegate.nextRaw(arg0, arg1); + @Override + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { + return delegate.nextRaw(result, scannerContext); } + @Override public boolean nextRaw(List<Cell> arg0) throws IOException { return delegate.nextRaw(arg0); } + + @Override + public int getBatch() { + return delegate.getBatch(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index 1f1ba36..19a1663 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -80,7 +80,7 @@ import com.google.common.collect.Maps; /** * Region observer that aggregates grouped rows (i.e. SQL query with GROUP BY clause) - * + * * @since 0.1 */ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { @@ -116,7 +116,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { offset = region.getStartKey().length != 0 ? region.getStartKey().length:region.getEndKey().length; ScanUtil.setRowKeyOffset(scan, offset); } - + List<Expression> expressions = deserializeGroupByExpressions(expressionBytes, 0); ServerAggregators aggregators = ServerAggregators.deserialize(scan @@ -124,7 +124,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { .getEnvironment().getConfiguration()); RegionScanner innerScanner = s; - + byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); TupleProjector tupleProjector = null; @@ -142,9 +142,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); innerScanner = - getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, + getWrappedScanner(c, innerScanner, offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); - } + } if (j != null) { innerScanner = @@ -223,13 +223,13 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { } /** - * + * * Cache for distinct values and their aggregations which is completely * in-memory (as opposed to spilling to disk). Used when GROUPBY_SPILLABLE_ATTRIB * is set to false. The memory usage is tracked at a coursed grain and will * throw and abort if too much is used. * - * + * * @since 3.0.0 */ private static final class InMemoryGroupByCache implements GroupByCache { @@ -238,9 +238,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { private final ServerAggregators aggregators; private final RegionCoprocessorEnvironment env; private final byte[] customAnnotations; - + private int estDistVals; - + InMemoryGroupByCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) { int estValueSize = aggregators.getEstimatedByteSize(); long estSize = sizeOfUnorderedGroupByMap(estDistVals, estValueSize); @@ -252,7 +252,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { this.chunk = tenantCache.getMemoryManager().allocate(estSize); this.customAnnotations = customAnnotations; } - + @Override public void close() throws IOException { this.chunk.close(); @@ -291,7 +291,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { chunk.resize(estSize); final List<KeyValue> aggResults = new ArrayList<KeyValue>(aggregateMap.size()); - + final Iterator<Map.Entry<ImmutableBytesPtr, Aggregator[]>> cacheIter = aggregateMap.entrySet().iterator(); while (cacheIter.hasNext()) { @@ -333,7 +333,9 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { @Override public boolean next(List<Cell> results) throws IOException { - if (index >= aggResults.size()) return false; + if (index >= aggResults.size()) { + return false; + } results.add(aggResults.get(index)); index++; return index < aggResults.size(); @@ -343,6 +345,11 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { public long getMaxResultSize() { return s.getMaxResultSize(); } + + @Override + public int getBatch() { + return s.getBatch(); + } }; } @@ -350,22 +357,22 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { public long size() { return aggregateMap.size(); } - + } private static final class GroupByCacheFactory { public static final GroupByCacheFactory INSTANCE = new GroupByCacheFactory(); - + private GroupByCacheFactory() { } - + GroupByCache newCache(RegionCoprocessorEnvironment env, ImmutableBytesWritable tenantId, byte[] customAnnotations, ServerAggregators aggregators, int estDistVals) { Configuration conf = env.getConfiguration(); boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); if (spillableEnabled) { return new SpillableGroupByCache(env, tenantId, aggregators, estDistVals); - } - + } + return new InMemoryGroupByCache(env, tenantId, customAnnotations, aggregators, estDistVals); } } @@ -388,14 +395,14 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { byte[] estDistValsBytes = scan.getAttribute(BaseScannerRegionObserver.ESTIMATED_DISTINCT_VALUES); if (estDistValsBytes != null) { // Allocate 1.5x estimation - estDistVals = Math.max(MIN_DISTINCT_VALUES, + estDistVals = Math.max(MIN_DISTINCT_VALUES, (int) (Bytes.toInt(estDistValsBytes) * 1.5f)); } final boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); - GroupByCache groupByCache = + GroupByCache groupByCache = GroupByCacheFactory.INSTANCE.newCache( env, ScanUtil.getTenantId(scan), ScanUtil.getCustomAnnotations(scan), aggregators, estDistVals); @@ -453,7 +460,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { * Used for an aggregate query in which the key order match the group by key order. In this * case, we can do the aggregation as we scan, by detecting when the group by key changes. * @param limit TODO - * @throws IOException + * @throws IOException */ private RegionScanner scanOrdered(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner scanner, final List<Expression> expressions, @@ -559,11 +566,15 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver { currentKey = null; return false; } - + @Override public long getMaxResultSize() { return scanner.getMaxResultSize(); } + @Override + public int getBatch() { + return scanner.getBatch(); + } }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java index cdfc771..1e34d96 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/HashJoinRegionScanner.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.hadoop.hbase.util.Bytes; import org.apache.phoenix.cache.GlobalCache; import org.apache.phoenix.cache.HashCache; @@ -48,7 +49,7 @@ import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.TupleUtil; public class HashJoinRegionScanner implements RegionScanner { - + private final RegionScanner scanner; private final TupleProjector projector; private final HashJoinInfo joinInfo; @@ -60,7 +61,7 @@ public class HashJoinRegionScanner implements RegionScanner { private List<Tuple>[] tempTuples; private ValueBitSet tempDestBitSet; private ValueBitSet[] tempSrcBitSet; - + @SuppressWarnings("unchecked") public HashJoinRegionScanner(RegionScanner scanner, TupleProjector projector, HashJoinInfo joinInfo, ImmutableBytesWritable tenantId, RegionCoprocessorEnvironment env) throws IOException { this.scanner = scanner; @@ -92,8 +93,8 @@ public class HashJoinRegionScanner implements RegionScanner { } HashCache hashCache = (HashCache)cache.getServerCache(joinId); if (hashCache == null) - throw new DoNotRetryIOException("Could not find hash cache for joinId: " - + Bytes.toString(joinId.get(), joinId.getOffset(), joinId.getLength()) + throw new DoNotRetryIOException("Could not find hash cache for joinId: " + + Bytes.toString(joinId.get(), joinId.getOffset(), joinId.getLength()) + ". The cache might have expired and have been removed."); hashCaches[i] = hashCache; tempSrcBitSet[i] = ValueBitSet.newInstance(joinInfo.getSchemas()[i]); @@ -103,18 +104,19 @@ public class HashJoinRegionScanner implements RegionScanner { this.projector.setValueBitSet(tempDestBitSet); } } - + private void processResults(List<Cell> result, boolean hasBatchLimit) throws IOException { if (result.isEmpty()) return; - + Tuple tuple = new ResultTuple(Result.create(result)); // For backward compatibility. In new versions, HashJoinInfo.forceProjection() // always returns true. if (joinInfo.forceProjection()) { tuple = projector.projectResults(tuple); } - + + // TODO: fix below Scanner.next() and Scanner.nextRaw() methods as well. if (hasBatchLimit) throw new UnsupportedOperationException("Cannot support join operations in scans with limit"); @@ -157,7 +159,7 @@ public class HashJoinRegionScanner implements RegionScanner { Tuple lhs = resultQueue.poll(); if (!earlyEvaluation) { ImmutableBytesPtr key = TupleUtil.getConcatenatedValue(lhs, joinInfo.getJoinExpressions()[i]); - tempTuples[i] = hashCaches[i].get(key); + tempTuples[i] = hashCaches[i].get(key); if (tempTuples[i] == null) { if (type == JoinType.Inner || type == JoinType.Semi) { continue; @@ -171,7 +173,7 @@ public class HashJoinRegionScanner implements RegionScanner { Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ? lhs : TupleProjector.mergeProjectedValue( (ProjectedValueTuple) lhs, schema, tempDestBitSet, - null, joinInfo.getSchemas()[i], tempSrcBitSet[i], + null, joinInfo.getSchemas()[i], tempSrcBitSet[i], joinInfo.getFieldPositions()[i]); resultQueue.offer(joined); continue; @@ -180,7 +182,7 @@ public class HashJoinRegionScanner implements RegionScanner { Tuple joined = tempSrcBitSet[i] == ValueBitSet.EMPTY_VALUE_BITSET ? lhs : TupleProjector.mergeProjectedValue( (ProjectedValueTuple) lhs, schema, tempDestBitSet, - t, joinInfo.getSchemas()[i], tempSrcBitSet[i], + t, joinInfo.getSchemas()[i], tempSrcBitSet[i], joinInfo.getFieldPositions()[i]); resultQueue.offer(joined); } @@ -211,18 +213,19 @@ public class HashJoinRegionScanner implements RegionScanner { } } } - + private boolean shouldAdvance() { if (!resultQueue.isEmpty()) return false; - + return hasMore; } - + private boolean nextInQueue(List<Cell> results) { - if (resultQueue.isEmpty()) + if (resultQueue.isEmpty()) { return false; - + } + Tuple tuple = resultQueue.poll(); for (int i = 0; i < tuple.size(); i++) { results.add(tuple.getValue(i)); @@ -252,19 +255,19 @@ public class HashJoinRegionScanner implements RegionScanner { processResults(result, false); result.clear(); } - + return nextInQueue(result); } @Override - public boolean nextRaw(List<Cell> result, int limit) + public boolean nextRaw(List<Cell> result, ScannerContext scannerContext) throws IOException { while (shouldAdvance()) { - hasMore = scanner.nextRaw(result, limit); - processResults(result, true); + hasMore = scanner.nextRaw(result, scannerContext); + processResults(result, false); // TODO fix honoring the limit result.clear(); } - + return nextInQueue(result); } @@ -285,19 +288,19 @@ public class HashJoinRegionScanner implements RegionScanner { processResults(result, false); result.clear(); } - + return nextInQueue(result); } @Override - public boolean next(List<Cell> result, int limit) throws IOException { + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { while (shouldAdvance()) { - hasMore = scanner.next(result, limit); - processResults(result, true); + hasMore = scanner.next(result, scannerContext); + processResults(result, false); // TODO honoring the limit result.clear(); } - - return nextInQueue(result); + + return nextInQueue(result); } @Override @@ -305,5 +308,10 @@ public class HashJoinRegionScanner implements RegionScanner { return this.scanner.getMaxResultSize(); } + @Override + public int getBatch() { + return this.scanner.getBatch(); + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 6f1d5ac..c40e3cd 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -69,20 +69,20 @@ public class MetaDataRegionObserver extends BaseRegionObserver { protected ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); private boolean enableRebuildIndex = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD; private long rebuildIndexTimeInterval = QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL; - + @Override public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c, boolean abortRequested) { executor.shutdownNow(); GlobalCache.getInstance(c.getEnvironment()).getMetaDataCache().invalidateAll(); } - + @Override public void start(CoprocessorEnvironment env) throws IOException { - // sleep a little bit to compensate time clock skew when SYSTEM.CATALOG moves + // sleep a little bit to compensate time clock skew when SYSTEM.CATALOG moves // among region servers because we relies on server time of RS which is hosting // SYSTEM.CATALOG - long sleepTime = env.getConfiguration().getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB, + long sleepTime = env.getConfiguration().getLong(QueryServices.CLOCK_SKEW_INTERVAL_ATTRIB, QueryServicesOptions.DEFAULT_CLOCK_SKEW_INTERVAL); try { if(sleepTime > 0) { @@ -91,12 +91,12 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } - enableRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, + enableRebuildIndex = env.getConfiguration().getBoolean(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD); - rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, + rebuildIndexTimeInterval = env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_INTERVAL_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL); } - + @Override public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) { @@ -119,7 +119,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { LOG.error("BuildIndexScheduleTask cannot start!", ex); } } - + /** * Task runs periodically to build indexes whose INDEX_NEED_PARTIALLY_REBUILD is set true * @@ -133,7 +133,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) { this.env = env; } - + private String getJdbcUrl() { String zkQuorum = this.env.getConfiguration().get(HConstants.ZOOKEEPER_QUORUM); String zkClientPort = this.env.getConfiguration().get(HConstants.ZOOKEEPER_CLIENT_PORT, @@ -144,7 +144,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkClientPort + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + zkParentNode; } - + + @Override public void run() { RegionScanner scanner = null; PhoenixConnection conn = null; @@ -199,7 +200,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { PhoenixDatabaseMetaData.INDEX_STATE_BYTES); if ((dataTable == null || dataTable.length == 0) || (indexStat == null || indexStat.length == 0) - || ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0) + || ((Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexStat) != 0) && (Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexStat) != 0))) { // index has to be either in disable or inactive state // data table name can't be empty http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index ddde407..77e124d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -199,7 +199,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { indexMaintainer = indexMaintainers.get(0); viewConstants = IndexUtil.deserializeViewConstantsFromScan(scan); } - + final TupleProjector p = TupleProjector.deserializeProjectorFromScan(scan); final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan); innerScanner = @@ -285,12 +285,12 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { } finally { try { if(iterator != null) { - iterator.close(); + iterator.close(); } } catch (SQLException e) { ServerUtil.throwIOException(region.getRegionNameAsString(), e); } finally { - chunk.close(); + chunk.close(); } } } @@ -299,6 +299,11 @@ public class ScanRegionObserver extends BaseScannerRegionObserver { public long getMaxResultSize() { return s.getMaxResultSize(); } + + @Override + public int getBatch() { + return s.getBatch(); + } }; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index e43e5e5..2d6d98a 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -101,8 +101,8 @@ import com.google.common.collect.Sets; /** * Region observer that aggregates ungrouped rows(i.e. SQL query with aggregation function and no GROUP BY). - * - * + * + * * @since 0.1 */ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ @@ -116,7 +116,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ public static final String EMPTY_CF = "EmptyCF"; private static final Logger logger = LoggerFactory.getLogger(UngroupedAggregateRegionObserver.class); private KeyValueBuilder kvBuilder; - + @Override public void start(CoprocessorEnvironment e) throws IOException { super.start(e); @@ -139,14 +139,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ public static void serializeIntoScan(Scan scan) { scan.setAttribute(BaseScannerRegionObserver.UNGROUPED_AGG, QueryConstants.TRUE); } - + @Override public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner s) throws IOException { s = super.preScannerOpen(e, scan, s); if (ScanUtil.isAnalyzeTable(scan)) { // We are setting the start row and stop row such that it covers the entire region. As part - // of Phonenix-1263 we are storing the guideposts against the physical table rather than + // of Phonenix-1263 we are storing the guideposts against the physical table rather than // individual tenant specific tables. scan.setStartRow(HConstants.EMPTY_START_ROW); scan.setStopRow(HConstants.EMPTY_END_ROW); @@ -154,7 +154,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } return s; } - + @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws IOException { int offset = 0; @@ -179,9 +179,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD); List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? null : IndexMaintainer.deserialize(localIndexBytes); List<Mutation> indexMutations = localIndexBytes == null ? Collections.<Mutation>emptyList() : Lists.<Mutation>newArrayListWithExpectedSize(1024); - + RegionScanner theScanner = s; - + byte[] indexUUID = scan.getAttribute(PhoenixIndexCodec.INDEX_UUID); PTable projectedTable = null; List<Expression> selectExpressions = null; @@ -226,14 +226,14 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } ImmutableBytesWritable tempPtr = new ImmutableBytesWritable(); theScanner = - getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, + getWrappedScanner(c, theScanner, offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr); - } - + } + if (j != null) { theScanner = new HashJoinRegionScanner(theScanner, p, j, ScanUtil.getTenantId(scan), c.getEnvironment()); } - + int batchSize = 0; List<Mutation> mutations = Collections.emptyList(); boolean buildLocalIndex = indexMaintainers != null && dataColumns==null && !localIndexScan; @@ -330,7 +330,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } column.getDataType().coerceBytes(ptr, value, expression.getDataType(), expression.getMaxLength(), - expression.getScale(), expression.getSortOrder(), + expression.getScale(), expression.getSortOrder(), column.getMaxLength(), column.getScale(), column.getSortOrder()); byte[] bytes = ByteUtil.copyKeyBytesIfNecessary(ptr); @@ -418,7 +418,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } } } - + if (logger.isDebugEnabled()) { logger.debug(LogUtil.addCustomAnnotations("Finished scanning " + rowCount + " rows for ungrouped coprocessor scan " + scan, ScanUtil.getCustomAnnotations(scan))); } @@ -438,7 +438,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ keyValue = KeyValueUtil.newKeyValue(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, AGG_TIMESTAMP, value, 0, value.length); } final KeyValue aggKeyValue = keyValue; - + RegionScanner scanner = new BaseRegionScanner() { private boolean done = !hadAny; @@ -464,11 +464,16 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ results.add(aggKeyValue); return false; } - + @Override public long getMaxResultSize() { return scan.getMaxResultSize(); } + + @Override + public int getBatch() { + return innerScanner.getBatch(); + } }; return scanner; } @@ -496,7 +501,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } indexMutations.clear(); } - + @Override public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> c, final Store store, InternalScanner scanner, final ScanType scanType) @@ -505,8 +510,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ InternalScanner internalScanner = scanner; if (scanType.equals(ScanType.COMPACT_DROP_DELETES)) { try { - boolean useCurrentTime = - c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + boolean useCurrentTime = + c.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); // Provides a means of clients controlling their timestamps to not use current time // when background tasks are updating stats. Instead we track the max timestamp of @@ -526,8 +531,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ } return internalScanner; } - - + + @Override public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e, HRegion l, HRegion r) throws IOException { @@ -535,8 +540,8 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ TableName table = region.getRegionInfo().getTable(); StatisticsCollector stats = null; try { - boolean useCurrentTime = - e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, + boolean useCurrentTime = + e.getEnvironment().getConfiguration().getBoolean(QueryServices.STATS_USE_CURRENT_TIME_ATTRIB, QueryServicesOptions.DEFAULT_STATS_USE_CURRENT_TIME); // Provides a means of clients controlling their timestamps to not use current time // when background tasks are updating stats. Instead we track the max timestamp of @@ -544,7 +549,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ long clientTimeStamp = useCurrentTime ? TimeKeeper.SYSTEM.getCurrentTime() : StatisticsCollector.NO_TIMESTAMP; stats = new StatisticsCollector(e.getEnvironment(), table.getNameAsString(), clientTimeStamp); stats.splitStats(region, l, r); - } catch (IOException ioe) { + } catch (IOException ioe) { if(logger.isWarnEnabled()) { logger.warn("Error while collecting stats during split for " + table,ioe); } @@ -559,7 +564,7 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver{ return PTableImpl.createFromProto(ptableProto); } catch (IOException e) { throw new RuntimeException(e); - } + } } private static List<Expression> deserializeExpressions(byte[] b) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java index 3469042..71cc1d6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/data/LocalTable.java @@ -70,4 +70,4 @@ public class LocalTable implements LocalHBaseState { scanner.close(); return r; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java index 68555ef..d39b01d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/covered/filter/FamilyOnlyFilter.java @@ -58,14 +58,14 @@ public class FamilyOnlyFilter extends FamilyFilter { @Override public ReturnCode filterKeyValue(Cell v) { if (done) { - return ReturnCode.SKIP; + return ReturnCode.NEXT_ROW; } ReturnCode code = super.filterKeyValue(v); if (previousMatchFound) { // we found a match before, and now we are skipping the key because of the family, therefore // we are done (no more of the family). - if (code.equals(ReturnCode.SKIP)) { - done = true; + if (code.equals(ReturnCode.SKIP) || code.equals(ReturnCode.NEXT_ROW)) { + done = true; } } else { // if we haven't seen a match before, then it doesn't matter what we see now, except to mark http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java index e225696..435a1c0 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/hbase/index/scanner/FilteredKeyValueScanner.java @@ -57,7 +57,7 @@ public class FilteredKeyValueScanner implements KeyValueScanner { /** * Same a {@link KeyValueScanner#next()} except that we filter out the next {@link KeyValue} until we find one that * passes the filter. - * + * * @return the next {@link KeyValue} or <tt>null</tt> if no next {@link KeyValue} is present and passes all the * filters. */ http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java index b89c807..b5e6a63 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexBuilder.java @@ -59,14 +59,14 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { Mutation m = miniBatchOp.getOperation(i); keys.add(PVarbinary.INSTANCE.getKeyRange(m.getRow())); List<IndexMaintainer> indexMaintainers = getCodec().getIndexMaintainers(m.getAttributesMap()); - + for(IndexMaintainer indexMaintainer: indexMaintainers) { if (indexMaintainer.isImmutableRows() && indexMaintainer.isLocalIndex()) continue; indexTableName.set(indexMaintainer.getIndexTableName()); if (maintainers.get(indexTableName) != null) continue; maintainers.put(indexTableName, indexMaintainer); } - + } if (maintainers.isEmpty()) return; Scan scan = IndexManagementUtil.newLocalStateScan(new ArrayList<IndexMaintainer>(maintainers.values())); @@ -100,7 +100,7 @@ public class PhoenixIndexBuilder extends CoveredColumnsIndexBuilder { private PhoenixIndexCodec getCodec() { return (PhoenixIndexCodec)this.codec; } - + @Override public byte[] getBatchId(Mutation m){ return this.codec.getBatchId(m); http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java index 88e141a..52fbe9c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerResultIterator.java @@ -31,15 +31,15 @@ import org.apache.phoenix.util.ServerUtil; public class RegionScannerResultIterator extends BaseResultIterator { private final RegionScanner scanner; - + public RegionScannerResultIterator(RegionScanner scanner) { this.scanner = scanner; } - + @Override public Tuple next() throws SQLException { - // XXX: No access here to the region instance to enclose this with startRegionOperation / - // stopRegionOperation + // XXX: No access here to the region instance to enclose this with startRegionOperation / + // stopRegionOperation synchronized (scanner) { try { // TODO: size @@ -48,6 +48,7 @@ public class RegionScannerResultIterator extends BaseResultIterator { // since this is an indication of whether or not there are more values after the // ones returned boolean hasMore = scanner.nextRaw(results); + if (!hasMore && results.isEmpty()) { return null; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index de59304..0e50923 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; /** @@ -58,15 +59,15 @@ public class StatisticsScanner implements InternalScanner { } @Override - public boolean next(List<Cell> result, int limit) throws IOException { - boolean ret = delegate.next(result, limit); + public boolean next(List<Cell> result, ScannerContext scannerContext) throws IOException { + boolean ret = delegate.next(result, scannerContext); updateStat(result); return ret; } /** * Update the current statistics based on the lastest batch of key-values from the underlying scanner - * + * * @param results * next batch of {@link KeyValue}s */ @@ -122,4 +123,5 @@ public class StatisticsScanner implements InternalScanner { } } } -} \ No newline at end of file + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java index 12f1863..030b114 100644 --- a/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java +++ b/phoenix-core/src/test/java/org/apache/hadoop/hbase/ipc/PhoenixIndexRpcSchedulerTest.java @@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ipc.RpcScheduler.Context; +import org.apache.hadoop.hbase.ipc.RpcServer.Connection; import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; import org.junit.Test; import org.mockito.Mockito; @@ -86,11 +87,12 @@ public class PhoenixIndexRpcSchedulerTest { } private void dispatchCallWithPriority(RpcScheduler scheduler, int priority) throws Exception { + Connection connection = Mockito.mock(Connection.class); CallRunner task = Mockito.mock(CallRunner.class); RequestHeader header = RequestHeader.newBuilder().setPriority(priority).build(); RpcServer server = new RpcServer(null, "test-rpcserver", null, isa, conf, scheduler); RpcServer.Call call = - server.new Call(0, null, null, header, null, null, null, null, 10, null); + server.new Call(0, null, null, header, null, null, connection, null, 10, null, null); Mockito.when(task.getCall()).thenReturn(call); scheduler.dispatch(task); @@ -98,4 +100,4 @@ public class PhoenixIndexRpcSchedulerTest { Mockito.verify(task).getCall(); Mockito.verifyNoMoreInteractions(task); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java index 54db5d8..e996b23 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/TestLocalTableState.java @@ -37,7 +37,6 @@ import org.junit.Test; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; - import org.apache.phoenix.hbase.index.covered.IndexUpdate; import org.apache.phoenix.hbase.index.covered.LocalTableState; import org.apache.phoenix.hbase.index.covered.data.LocalHBaseState; http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java index 216f548..808e6bc 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/covered/filter/TestFamilyOnlyFilter.java @@ -47,7 +47,7 @@ public class TestFamilyOnlyFilter { kv = new KeyValue(row, fam2, qual, 10, val); code = filter.filterKeyValue(kv); - assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code); + assertEquals("Didn't filter out non-matching family!", ReturnCode.NEXT_ROW, code); } @Test @@ -61,7 +61,7 @@ public class TestFamilyOnlyFilter { KeyValue kv = new KeyValue(row, fam, qual, 10, val); ReturnCode code = filter.filterKeyValue(kv); - assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code); + assertEquals("Didn't filter out non-matching family!", ReturnCode.NEXT_ROW, code); kv = new KeyValue(row, fam2, qual, 10, val); code = filter.filterKeyValue(kv); @@ -69,7 +69,7 @@ public class TestFamilyOnlyFilter { kv = new KeyValue(row, fam3, qual, 10, val); code = filter.filterKeyValue(kv); - assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code); + assertEquals("Didn't filter out non-matching family!", ReturnCode.NEXT_ROW, code); } @Test @@ -83,7 +83,7 @@ public class TestFamilyOnlyFilter { KeyValue kv = new KeyValue(row, fam, qual, 10, val); ReturnCode code = filter.filterKeyValue(kv); - assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code); + assertEquals("Didn't filter out non-matching family!", ReturnCode.NEXT_ROW, code); KeyValue accept = new KeyValue(row, fam2, qual, 10, val); code = filter.filterKeyValue(accept); @@ -91,12 +91,12 @@ public class TestFamilyOnlyFilter { kv = new KeyValue(row, fam3, qual, 10, val); code = filter.filterKeyValue(kv); - assertEquals("Didn't filter out non-matching family!", ReturnCode.SKIP, code); + assertEquals("Didn't filter out non-matching family!", ReturnCode.NEXT_ROW, code); // we shouldn't match the family again - everything after a switched family should be ignored code = filter.filterKeyValue(accept); assertEquals("Should have skipped a 'matching' family if it arrives out of order", - ReturnCode.SKIP, code); + ReturnCode.NEXT_ROW, code); // reset the filter and we should accept it again filter.reset(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java index 60c11d7..ae577bd 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/hbase/index/write/TestWALRecoveryCaching.java @@ -317,9 +317,9 @@ public class TestWALRecoveryCaching { } LOG.info("Starting region server:" + server.getHostname()); - cluster.startRegionServer(server.getHostname()); + cluster.startRegionServer(server.getHostname(), server.getPort()); - cluster.waitForRegionServerToStart(server.getHostname(), TIMEOUT); + cluster.waitForRegionServerToStart(server.getHostname(), server.getPort(), TIMEOUT); // start a server to get back to the base number of servers LOG.info("STarting server to replace " + server); http://git-wip-us.apache.org/repos/asf/phoenix/blob/98271b88/phoenix-flume/pom.xml ---------------------------------------------------------------------- diff --git a/phoenix-flume/pom.xml b/phoenix-flume/pom.xml index 7ed0801..b2b9a47 100644 --- a/phoenix-flume/pom.xml +++ b/phoenix-flume/pom.xml @@ -85,7 +85,6 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-testing-util</artifactId> - <version>${hbase.version}</version> <scope>test</scope> <optional>true</optional> <exclusions> @@ -98,7 +97,6 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-it</artifactId> - <version>${hbase.version}</version> <type>test-jar</type> <scope>test</scope> <exclusions> @@ -111,41 +109,34 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-common</artifactId> - <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-protocol</artifactId> - <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> - <version>${hbase.version}</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop-compat</artifactId> - <version>${hbase.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop-compat</artifactId> - <version>${hbase.version}</version> <type>test-jar</type> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> - <version>${hbase.version}</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-hadoop2-compat</artifactId> - <version>${hbase.version}</version> <type>test-jar</type> <scope>test</scope> </dependency>