PHOENIX-4366 Rebuilding a local index fails sometimes
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b520d0e6 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b520d0e6 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b520d0e6 Branch: refs/heads/5.x-HBase-2.0 Commit: b520d0e6a3be5f6afcdc60039c5186643ed567ab Parents: 6e28d29 Author: James Taylor <jtay...@salesforce.com> Authored: Wed Apr 11 13:37:45 2018 -0700 Committer: James Taylor <jtay...@salesforce.com> Committed: Wed Apr 11 14:05:43 2018 -0700 ---------------------------------------------------------------------- .../coprocessor/BaseScannerRegionObserver.java | 8 ++---- .../GroupedAggregateRegionObserver.java | 4 +++ .../phoenix/coprocessor/ScanRegionObserver.java | 4 ++- .../UngroupedAggregateRegionObserver.java | 7 ++++- .../NonAggregateRegionScannerFactory.java | 29 +++++++++----------- .../phoenix/iterate/RegionScannerFactory.java | 4 +-- .../apache/phoenix/iterate/SnapshotScanner.java | 6 +--- 7 files changed, 31 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b520d0e6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 5c2e639..7e4e63c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -131,8 +131,6 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { /** Exposed for testing */ public static final String SCANNER_OPENED_TRACE_INFO = "Scanner opened on server"; - protected QualifierEncodingScheme encodingScheme; - protected boolean useNewValueColumnQualifier; /** * Used by logger to identify coprocessor @@ -198,8 +196,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { // start exclusive and the stop inclusive. ScanUtil.setupReverseScan(scan); } - this.encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); - this.useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); + return s; } private class RegionScannerHolder extends DelegateRegionScanner { @@ -336,8 +333,7 @@ abstract public class BaseScannerRegionObserver implements RegionObserver { final byte[][] viewConstants, final TupleProjector projector, final ImmutableBytesWritable ptr, final boolean useQualiferAsListIndex) { - RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment(), - useNewValueColumnQualifier, encodingScheme); + RegionScannerFactory regionScannerFactory = new NonAggregateRegionScannerFactory(c.getEnvironment()); return regionScannerFactory.getWrappedScanner(c.getEnvironment(), s, null, null, offset, scan, dataColumns, tupleProjector, dataRegion, indexMaintainer, null, viewConstants, null, null, projector, ptr, useQualiferAsListIndex); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b520d0e6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java index a14c5a2..5d89e8e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java @@ -64,6 +64,7 @@ import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.join.HashJoinInfo; import org.apache.phoenix.memory.MemoryManager.MemoryChunk; import org.apache.phoenix.query.QueryConstants; +import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.SortOrder; import org.apache.phoenix.schema.tuple.EncodedColumnQualiferCellsList; import org.apache.phoenix.schema.tuple.MultiKeyValueTuple; @@ -118,6 +119,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im keyOrdered = true; } int offset = 0; + boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); if (ScanUtil.isLocalIndex(scan)) { /* * For local indexes, we need to set an offset on row key expressions to skip @@ -403,6 +405,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan)); final boolean spillableEnabled = conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, DEFAULT_GROUPBY_SPILLABLE); + final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); GroupByCache groupByCache = GroupByCacheFactory.INSTANCE.newCache( @@ -474,6 +477,7 @@ public class GroupedAggregateRegionObserver extends BaseScannerRegionObserver im } final Pair<Integer, Integer> minMaxQualifiers = EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan); final boolean useQualifierAsIndex = EncodedColumnsUtil.useQualifierAsIndex(minMaxQualifiers); + final PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); return new BaseRegionScanner(scanner) { private long rowCount = 0; private ImmutableBytesPtr currentKey = null; http://git-wip-us.apache.org/repos/asf/phoenix/blob/b520d0e6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java index a995f45..1aba5e9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/ScanRegionObserver.java @@ -32,6 +32,8 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.io.WritableUtils; import org.apache.phoenix.expression.OrderByExpression; import org.apache.phoenix.iterate.NonAggregateRegionScannerFactory; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.util.EncodedColumnsUtil; /** * @@ -75,7 +77,7 @@ public class ScanRegionObserver extends BaseScannerRegionObserver implements Reg @Override protected RegionScanner doPostScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c, final Scan scan, final RegionScanner s) throws Throwable { - NonAggregateRegionScannerFactory nonAggregateROUtil = new NonAggregateRegionScannerFactory(c.getEnvironment(), useNewValueColumnQualifier, encodingScheme); + NonAggregateRegionScannerFactory nonAggregateROUtil = new NonAggregateRegionScannerFactory(c.getEnvironment()); return nonAggregateROUtil.getRegionScanner(scan, s); } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b520d0e6/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index 4697b99..37d2b4d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -377,7 +377,12 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver env, region.getRegionInfo().getTable().getNameAsString(), ts, gp_width_bytes, gp_per_region_bytes); return collectStats(s, statsCollector, region, scan, env.getConfiguration()); - } else if (ScanUtil.isIndexRebuild(scan)) { return rebuildIndices(s, region, scan, env.getConfiguration()); } + } else if (ScanUtil.isIndexRebuild(scan)) { + return rebuildIndices(s, region, scan, env.getConfiguration()); + } + + PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); + boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); int offsetToBe = 0; if (localIndexScan) { /* http://git-wip-us.apache.org/repos/asf/phoenix/blob/b520d0e6/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java index 41d60bf..b610134 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/NonAggregateRegionScannerFactory.java @@ -74,20 +74,13 @@ import static org.apache.phoenix.util.EncodedColumnsUtil.getMinMaxQualifiersFrom public class NonAggregateRegionScannerFactory extends RegionScannerFactory { - private ImmutableBytesWritable ptr = new ImmutableBytesWritable(); - private KeyValueSchema kvSchema = null; - private ValueBitSet kvSchemaBitSet; - - public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env, boolean useNewValueColumnQualifier, - PTable.QualifierEncodingScheme encodingScheme) { + public NonAggregateRegionScannerFactory(RegionCoprocessorEnvironment env) { this.env = env; - this.useNewValueColumnQualifier = useNewValueColumnQualifier; - this.encodingScheme = encodingScheme; } @Override public RegionScanner getRegionScanner(final Scan scan, final RegionScanner s) throws Throwable { - + ImmutableBytesWritable ptr = new ImmutableBytesWritable(); int offset = 0; if (ScanUtil.isLocalIndex(scan)) { /* @@ -106,9 +99,17 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { scanOffset = (Integer)PInteger.INSTANCE.toObject(scanOffsetBytes); } RegionScanner innerScanner = s; + PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); + boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); Set<KeyValueColumnExpression> arrayKVRefs = Sets.newHashSet(); - Expression[] arrayFuncRefs = deserializeArrayPostionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs); + Expression[] arrayFuncRefs = deserializeArrayPositionalExpressionInfoFromScan(scan, innerScanner, arrayKVRefs); + KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); + for (Expression expression : arrayFuncRefs) { + builder.addField(expression); + } + KeyValueSchema kvSchema = builder.build(); + ValueBitSet kvSchemaBitSet = ValueBitSet.newInstance(kvSchema); TupleProjector tupleProjector = null; Region dataRegion = null; IndexMaintainer indexMaintainer = null; @@ -196,13 +197,12 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { } } - private Expression[] deserializeArrayPostionalExpressionInfoFromScan(Scan scan, RegionScanner s, - Set<KeyValueColumnExpression> arrayKVRefs) { + private Expression[] deserializeArrayPositionalExpressionInfoFromScan(Scan scan, RegionScanner s, + Set<KeyValueColumnExpression> arrayKVRefs) { byte[] specificArrayIdx = scan.getAttribute(BaseScannerRegionObserver.SPECIFIC_ARRAY_INDEX); if (specificArrayIdx == null) { return null; } - KeyValueSchema.KeyValueSchemaBuilder builder = new KeyValueSchema.KeyValueSchemaBuilder(0); ByteArrayInputStream stream = new ByteArrayInputStream(specificArrayIdx); try { DataInputStream input = new DataInputStream(stream); @@ -220,10 +220,7 @@ public class NonAggregateRegionScannerFactory extends RegionScannerFactory { ArrayIndexFunction arrayIdxFunc = new ArrayIndexFunction(); arrayIdxFunc.readFields(input); arrayFuncRefs[i] = arrayIdxFunc; - builder.addField(arrayIdxFunc); } - kvSchema = builder.build(); - kvSchemaBitSet = ValueBitSet.newInstance(kvSchema); return arrayFuncRefs; } catch (IOException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/b520d0e6/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java index 59f08ae..c93945e 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/RegionScannerFactory.java @@ -42,6 +42,7 @@ import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.ValueBitSet; import org.apache.phoenix.schema.tuple.*; import org.apache.phoenix.transaction.PhoenixTransactionContext; +import org.apache.phoenix.util.EncodedColumnsUtil; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; @@ -55,8 +56,6 @@ import java.util.Set; public abstract class RegionScannerFactory { protected RegionCoprocessorEnvironment env; - protected boolean useNewValueColumnQualifier; - protected PTable.QualifierEncodingScheme encodingScheme; /** * Returns the region based on the value of the @@ -108,6 +107,7 @@ public abstract class RegionScannerFactory { private boolean hasReferences = checkForReferenceFiles(); private RegionInfo regionInfo = env.getRegionInfo(); private byte[] actualStartKey = getActualStartKey(); + private boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); // If there are any reference files after local index region merge some cases we might // get the records less than scan start row key. This will happen when we replace the http://git-wip-us.apache.org/repos/asf/phoenix/blob/b520d0e6/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java index cfb3149..5065300 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/SnapshotScanner.java @@ -65,15 +65,11 @@ public class SnapshotScanner extends AbstractClientScanner { values = new ArrayList<>(); this.region = HRegion.openHRegion(conf, fs, rootDir, hri, htd, null, null, null); - // process the region scanner for non-aggregate queries - PTable.QualifierEncodingScheme encodingScheme = EncodedColumnsUtil.getQualifierEncodingScheme(scan); - boolean useNewValueColumnQualifier = EncodedColumnsUtil.useNewValueColumnQualifier(scan); - RegionCoprocessorEnvironment snapshotEnv = getSnapshotContextEnvironment(conf); RegionScannerFactory regionScannerFactory; if (scan.getAttribute(BaseScannerRegionObserver.NON_AGGREGATE_QUERY) != null) { - regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv, useNewValueColumnQualifier, encodingScheme); + regionScannerFactory = new NonAggregateRegionScannerFactory(snapshotEnv); } else { /* future work : Snapshot M/R jobs for aggregate queries*/ throw new UnsupportedOperationException("Snapshot M/R jobs not available for aggregate queries");