http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java index 3fdcde8..09abde4 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataProtocol.java @@ -62,7 +62,7 @@ import com.google.protobuf.ByteString; */ public abstract class MetaDataProtocol extends MetaDataService { public static final int PHOENIX_MAJOR_VERSION = 4; - public static final int PHOENIX_MINOR_VERSION = 11; + public static final int PHOENIX_MINOR_VERSION = 12; public static final int PHOENIX_PATCH_NUMBER = 0; public static final int PHOENIX_VERSION = VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER); @@ -87,8 +87,9 @@ public abstract class MetaDataProtocol extends MetaDataService { public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0 = MIN_TABLE_TIMESTAMP + 20; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0 = MIN_TABLE_TIMESTAMP + 25; public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0 = MIN_TABLE_TIMESTAMP + 27; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0 = MIN_TABLE_TIMESTAMP + 28; // MIN_SYSTEM_TABLE_TIMESTAMP needs to be set to the max of all the MIN_SYSTEM_TABLE_TIMESTAMP_* constants - public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0; + public static final long MIN_SYSTEM_TABLE_TIMESTAMP = MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0; // ALWAYS update this map whenever rolling out a new release (major, minor or patch release). // Key is the SYSTEM.CATALOG timestamp for the version and value is the version string. @@ -105,6 +106,7 @@ public abstract class MetaDataProtocol extends MetaDataService { TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_9_0, "4.9.x"); TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_10_0, "4.10.x"); TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_11_0, "4.11.x"); + TIMESTAMP_VERSION_MAP.put(MIN_SYSTEM_TABLE_TIMESTAMP_4_12_0, "4.12.x"); } public static final String CURRENT_CLIENT_VERSION = PHOENIX_MAJOR_VERSION + "." + PHOENIX_MINOR_VERSION + "." + PHOENIX_PATCH_NUMBER;
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java index 4d40f1c..8f02901 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/MetaDataRegionObserver.java @@ -42,18 +42,15 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.CoprocessorEnvironment; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HTableInterface; -import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.filter.CompareFilter; -import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.RegionScanner; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.log4j.Level; import org.apache.log4j.Logger; @@ -69,7 +66,6 @@ import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; import org.apache.phoenix.jdbc.PhoenixDriver; import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; -import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PName; import org.apache.phoenix.schema.PTable; @@ -96,6 +92,7 @@ import com.google.common.collect.Maps; * Coprocessor for metadata related operations. This coprocessor would only be registered * to SYSTEM.TABLE. */ +@SuppressWarnings("deprecation") public class MetaDataRegionObserver extends BaseRegionObserver { public static final Log LOG = LogFactory.getLog(MetaDataRegionObserver.class); public static final String REBUILD_INDEX_APPEND_TO_URL_STRING = "REBUILDINDEX"; @@ -212,9 +209,9 @@ public class MetaDataRegionObserver extends BaseRegionObserver { // running private final static AtomicInteger inProgress = new AtomicInteger(0); RegionCoprocessorEnvironment env; - private long rebuildIndexBatchSize = HConstants.LATEST_TIMESTAMP; - private long configuredBatches = 10; - private long indexDisableTimestampThreshold; + private final long rebuildIndexBatchSize; + private final long configuredBatches; + private final long indexDisableTimestampThreshold; private final ReadOnlyProps props; public BuildIndexScheduleTask(RegionCoprocessorEnvironment env) { @@ -223,7 +220,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { this.rebuildIndexBatchSize = configuration.getLong( QueryServices.INDEX_FAILURE_HANDLING_REBUILD_PERIOD, HConstants.LATEST_TIMESTAMP); this.configuredBatches = configuration.getLong( - QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, configuredBatches); + QueryServices.INDEX_FAILURE_HANDLING_REBUILD_NUMBER_OF_BATCHES_PER_TABLE, 10); this.indexDisableTimestampThreshold = configuration.getLong(QueryServices.INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD, QueryServicesOptions.DEFAULT_INDEX_REBUILD_DISABLE_TIMESTAMP_THRESHOLD); @@ -288,7 +285,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { continue; } - byte[] indexState = CellUtil.cloneValue(indexStateCell); + byte[] indexStateBytes = CellUtil.cloneValue(indexStateCell); byte[][] rowKeyMetaData = new byte[3][]; SchemaUtil.getVarChars(r.getRow(), 3, rowKeyMetaData); byte[] schemaName = rowKeyMetaData[PhoenixDatabaseMetaData.SCHEMA_NAME_INDEX]; @@ -340,11 +337,18 @@ public class MetaDataRegionObserver extends BaseRegionObserver { continue; // don't attempt another rebuild irrespective of whether // updateIndexState worked or not } + PIndexState indexState = PIndexState.fromSerializedValue(indexStateBytes[0]); // Allow index to begin incremental maintenance as index is back online and we // cannot transition directly from DISABLED -> ACTIVE - if (Bytes.compareTo(PIndexState.DISABLE.getSerializedBytes(), indexState) == 0) { + if (indexState == PIndexState.DISABLE) { IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.INACTIVE, null); continue; // Must wait until clients start to do index maintenance again + } else if (indexState == PIndexState.PENDING_ACTIVE) { + IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, null); + continue; // Must wait until clients start to do index maintenance again + } else if (indexState != PIndexState.INACTIVE && indexState != PIndexState.ACTIVE) { + LOG.warn("Unexpected index state of " + indexTableFullName + "=" + indexState + ". Skipping partial rebuild attempt."); + continue; } long currentTime = EnvironmentEdgeManager.currentTimeMillis(); long forwardOverlapDurationMs = env.getConfiguration().getLong( @@ -354,17 +358,9 @@ public class MetaDataRegionObserver extends BaseRegionObserver { if (indexStateCell.getTimestamp() + forwardOverlapDurationMs > currentTime) { continue; // Haven't waited long enough yet } - Long upperBoundOfRebuild = HConstants.LATEST_TIMESTAMP; - if (Bytes.compareTo(PIndexState.INACTIVE.getSerializedBytes(), indexState) == 0) { - upperBoundOfRebuild = indexStateCell.getTimestamp() + forwardOverlapDurationMs; - } else if (Bytes.compareTo(PIndexState.ACTIVE.getSerializedBytes(), indexState) == 0) { - // Since the index state cell is updated every time the INDEX_DISABLED_TIMESTAMP - // changes, we know the upper bound. - upperBoundOfRebuild = indexStateCell.getTimestamp() + 1; - } else { - LOG.warn("Unexpected index state of " + indexTableFullName + ":" + Bytes.toStringBinary(indexState)); - continue; // Ignore as this is an unexpected state - } + Long upperBoundOfRebuild = indexStateCell.getTimestamp() + forwardOverlapDurationMs; + // Pass in upperBoundOfRebuild when setting index state or increasing disable ts + // and fail if index timestamp > upperBoundOfRebuild. List<Pair<PTable,Long>> indexesToPartiallyRebuild = dataTableToIndexesMap.get(dataPTable); if (indexesToPartiallyRebuild == null) { indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(dataPTable.getIndexes().size()); @@ -377,7 +373,7 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } while (hasMore); if (dataTableToIndexesMap != null) { - long backwardOverlapDurationMs = env.getConfiguration().getLong( + long backwardOverlapDurationMs = env.getConfiguration().getLong( QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME_ATTRIB, env.getConfiguration().getLong(QueryServices.INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_TIME_ATTRIB, QueryServicesOptions.DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME)); @@ -385,7 +381,8 @@ public class MetaDataRegionObserver extends BaseRegionObserver { PTable dataPTable = entry.getKey(); List<Pair<PTable,Long>> pairs = entry.getValue(); List<PTable> indexesToPartiallyRebuild = Lists.newArrayListWithExpectedSize(pairs.size()); - try (HTableInterface metaTable = env.getTable( + try ( + HTableInterface metaTable = env.getTable( SchemaUtil.getPhysicalName(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES, props))) { long earliestDisableTimestamp = Long.MAX_VALUE; long latestUpperBoundTimestamp = Long.MIN_VALUE; @@ -436,7 +433,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { MutationPlan plan = compiler.compile(Collections.singletonList(tableRef), null, null, null, scanEndTime); Scan dataTableScan = IndexManagementUtil.newLocalStateScan(plan.getContext().getScan(), maintainers); - // We can't allow partial results dataTableScan.setTimeRange(scanBeginTime, scanEndTime); dataTableScan.setCacheBlocks(false); dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, TRUE_BYTES); @@ -463,30 +459,17 @@ public class MetaDataRegionObserver extends BaseRegionObserver { indexPTable.getSchemaName().getString(), indexPTable.getTableName().getString()); if (scanEndTime == latestUpperBoundTimestamp) { - // Finished building. Pass in the expected value for the index disabled timestamp - // and only set to active if it hasn't changed (i.e. a write failed again, before - // we're done). We take the absolute value because if the option to leave the - // index active upon failure is used, we'll see a negative value when a write - // fails. - IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, Math.abs(indexPTable.getIndexDisableTimestamp())); + IndexUtil.updateIndexState(conn, indexTableFullName, PIndexState.ACTIVE, 0L, latestUpperBoundTimestamp); batchExecutedPerTableMap.remove(dataPTable.getName()); LOG.info("Making Index:" + indexPTable.getTableName() + " active after rebuilding"); } else { - // Maintain sign of INDEX_DISABLE_TIMESTAMP (see comment above) - if (!updateDisableTimestamp(conn, indexTableFullName, scanEndTime * signOfDisableTimeStamp, metaTable, indexPTable.getIndexDisableTimestamp())) { - LOG.warn("The index disabled timestamp for " + indexTableFullName + " was updated outside of rebuilder. Will reattempt rebuild next iteration."); - } + // Increment timestamp so that client sees updated disable timestamp + IndexUtil.updateIndexState(conn, indexTableFullName, indexPTable.getIndexState(), scanEndTime * signOfDisableTimeStamp, latestUpperBoundTimestamp); Long noOfBatches = batchExecutedPerTableMap.get(dataPTable.getName()); if (noOfBatches == null) { noOfBatches = 0l; } batchExecutedPerTableMap.put(dataPTable.getName(), ++noOfBatches); - // clearing cache to get the updated - // disabled timestamp - new MetaDataClient(conn).updateCache(dataPTable.getSchemaName().getString(), - dataPTable.getTableName().getString()); - new MetaDataClient(conn).updateCache(indexPTable.getSchemaName().getString(), - indexPTable.getTableName().getString()); LOG.info("During Round-robin build: Successfully updated index disabled timestamp for " + indexTableFullName + " to " + scanEndTime); } @@ -531,17 +514,6 @@ public class MetaDataRegionObserver extends BaseRegionObserver { } } - private static boolean updateDisableTimestamp(PhoenixConnection conn, String indexTableName, - long disabledTimestamp, HTableInterface metaTable, long expectedDisabledTimestamp) throws IOException { - byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); - Put put = new Put(indexTableKey); - put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, - PLong.INSTANCE.toBytes(disabledTimestamp)); - return metaTable.checkAndPut(indexTableKey, PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, - PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, CompareOp.EQUAL, PLong.INSTANCE.toBytes(expectedDisabledTimestamp), - put); - } - @VisibleForTesting public static synchronized void initRebuildIndexConnectionProps(Configuration config) { if (rebuildIndexConnectionProps == null) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java index 2fdb279..0100f9f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/execute/MutationState.java @@ -474,10 +474,10 @@ public class MutationState implements SQLCloseable { final long timestamp, boolean includeAllIndexes, final boolean sendAll) { final PTable table = tableRef.getTable(); final Iterator<PTable> indexes = // Only maintain tables with immutable rows through this client-side mechanism - includeAllIndexes || table.isWALDisabled() ? // TODO: remove check for isWALDisabled once PHOENIX-3137 is fixed. - IndexMaintainer.nonDisabledIndexIterator(table.getIndexes().iterator()) : + includeAllIndexes ? + IndexMaintainer.maintainedIndexes(table.getIndexes().iterator()) : table.isImmutableRows() ? - IndexMaintainer.enabledGlobalIndexIterator(table.getIndexes().iterator()) : + IndexMaintainer.maintainedGlobalIndexes(table.getIndexes().iterator()) : Iterators.<PTable>emptyIterator(); final List<Mutation> mutationList = Lists.newArrayListWithExpectedSize(values.size()); final List<Mutation> mutationsPertainingToIndex = indexes.hasNext() ? Lists.<Mutation>newArrayListWithExpectedSize(values.size()) : null; @@ -708,7 +708,7 @@ public class MutationState implements SQLCloseable { for (PTable idxTtable : indexes) { // If index is still active, but has a non zero INDEX_DISABLE_TIMESTAMP value, then infer that // our failure mode is block writes on index failure. - if (idxTtable.getIndexState() == PIndexState.ACTIVE && idxTtable.getIndexDisableTimestamp() > 0) { + if ((idxTtable.getIndexState() == PIndexState.ACTIVE || idxTtable.getIndexState() == PIndexState.PENDING_ACTIVE) && idxTtable.getIndexDisableTimestamp() > 0) { throw new SQLExceptionInfo.Builder(SQLExceptionCode.INDEX_FAILURE_BLOCK_WRITE) .setSchemaName(table.getSchemaName().getString()) .setTableName(table.getTableName().getString()).build().buildException(); @@ -796,7 +796,7 @@ public class MutationState implements SQLCloseable { try { PTable table = tableRef.getTable(); List<PTable> indexes = table.getIndexes(); - Iterator<PTable> enabledIndexes = IndexMaintainer.nonDisabledIndexIterator(indexes.iterator()); + Iterator<PTable> enabledIndexes = IndexMaintainer.maintainedIndexes(indexes.iterator()); if (enabledIndexes.hasNext()) { List<PTable> keyValueIndexes = Collections.emptyList(); ImmutableBytesWritable indexMetaDataPtr = new ImmutableBytesWritable(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java index c5c3dda..83b1d58 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/IndexMaintainer.java @@ -143,29 +143,34 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { return maintainer; } - public static Iterator<PTable> nonDisabledIndexIterator(Iterator<PTable> indexes) { + private static boolean sendIndexMaintainer(PTable index) { + PIndexState indexState = index.getIndexState(); + return ! ( PIndexState.DISABLE == indexState || PIndexState.PENDING_ACTIVE == indexState ); + } + + public static Iterator<PTable> maintainedIndexes(Iterator<PTable> indexes) { return Iterators.filter(indexes, new Predicate<PTable>() { @Override public boolean apply(PTable index) { - return !PIndexState.DISABLE.equals(index.getIndexState()); + return sendIndexMaintainer(index); } }); } - public static Iterator<PTable> enabledGlobalIndexIterator(Iterator<PTable> indexes) { + public static Iterator<PTable> maintainedGlobalIndexes(Iterator<PTable> indexes) { return Iterators.filter(indexes, new Predicate<PTable>() { @Override public boolean apply(PTable index) { - return !PIndexState.DISABLE.equals(index.getIndexState()) && !index.getIndexType().equals(IndexType.LOCAL); + return sendIndexMaintainer(index) && index.getIndexType() == IndexType.GLOBAL; } }); } - public static Iterator<PTable> enabledLocalIndexIterator(Iterator<PTable> indexes) { + public static Iterator<PTable> maintainedLocalIndexes(Iterator<PTable> indexes) { return Iterators.filter(indexes, new Predicate<PTable>() { @Override public boolean apply(PTable index) { - return !PIndexState.DISABLE.equals(index.getIndexState()) && index.getIndexType().equals(IndexType.LOCAL); + return sendIndexMaintainer(index) && index.getIndexType() == IndexType.LOCAL; } }); } @@ -188,9 +193,9 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { */ public static void serialize(PTable dataTable, ImmutableBytesWritable ptr, List<PTable> indexes, PhoenixConnection connection) { - Iterator<PTable> indexesItr = nonDisabledIndexIterator(indexes.iterator()); + Iterator<PTable> indexesItr = maintainedIndexes(indexes.iterator()); if ((dataTable.isImmutableRows()) || !indexesItr.hasNext()) { - indexesItr = enabledLocalIndexIterator(indexesItr); + indexesItr = maintainedLocalIndexes(indexesItr); if (!indexesItr.hasNext()) { ptr.set(ByteUtil.EMPTY_BYTE_ARRAY); return; @@ -209,8 +214,8 @@ public class IndexMaintainer implements Writable, Iterable<ColumnReference> { // Write out data row key schema once, since it's the same for all index maintainers dataTable.getRowKeySchema().write(output); indexesItr = - dataTable.isImmutableRows() ? enabledLocalIndexIterator(indexes.iterator()) - : nonDisabledIndexIterator(indexes.iterator()); + dataTable.isImmutableRows() ? maintainedLocalIndexes(indexes.iterator()) + : maintainedIndexes(indexes.iterator()); while (indexesItr.hasNext()) { org.apache.phoenix.coprocessor.generated.ServerCachingProtos.IndexMaintainer proto = IndexMaintainer.toProto(indexesItr.next().getIndexMaintainer(dataTable, connection)); byte[] protoBytes = proto.toByteArray(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java index 671e3ef..5666da9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/index/PhoenixIndexFailurePolicy.java @@ -51,7 +51,6 @@ import org.apache.phoenix.query.QueryServices; import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PIndexState; import org.apache.phoenix.schema.PTable; -import org.apache.phoenix.schema.PTable.IndexType; import org.apache.phoenix.util.IndexUtil; import org.apache.phoenix.util.MetaDataUtil; import org.apache.phoenix.util.PhoenixRuntime; @@ -201,7 +200,7 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { return timestamp; } - PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.ACTIVE; + PIndexState newState = disableIndexOnFailure ? PIndexState.DISABLE : PIndexState.PENDING_ACTIVE; // for all the index tables that we've found, try to disable them and if that fails, try to for (Map.Entry<String, Long> tableTimeElement :indexTableNames.entrySet()){ String indexTableName = tableTimeElement.getKey(); @@ -266,12 +265,9 @@ public class PhoenixIndexFailurePolicy extends DelegateIndexFailurePolicy { Map<ImmutableBytesWritable, String> localIndexNames = new HashMap<ImmutableBytesWritable, String>(); for (PTable index : indexes) { - if (index.getIndexType() == IndexType.LOCAL - && index.getIndexState() == PIndexState.ACTIVE) { - if (localIndex == null) localIndex = index; - localIndexNames.put(new ImmutableBytesWritable(MetaDataUtil.getViewIndexIdDataType().toBytes( - index.getViewIndexId())), index.getName().getString()); - } + if (localIndex == null) localIndex = index; + localIndexNames.put(new ImmutableBytesWritable(MetaDataUtil.getViewIndexIdDataType().toBytes( + index.getViewIndexId())), index.getName().getString()); } if (localIndex == null) { return Collections.emptySet(); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java index ac7f3ef..c34d20d 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/jdbc/PhoenixDatabaseMetaData.java @@ -321,6 +321,7 @@ public class PhoenixDatabaseMetaData implements DatabaseMetaData { public static final int MIN_LOCAL_SI_VERSION_DISALLOW = VersionUtil.encodeVersion("0", "98", "6"); public static final int MIN_RENEW_LEASE_VERSION = VersionUtil.encodeVersion("1", "1", "3"); public static final int MIN_NAMESPACE_MAPPED_PHOENIX_VERSION = VersionUtil.encodeVersion("4", "8", "0"); + public static final int MIN_PENDING_ACTIVE_INDEX = VersionUtil.encodeVersion("4", "12", "0"); // Version below which we should turn off essential column family. public static final int ESSENTIAL_FAMILY_VERSION_THRESHOLD = VersionUtil.encodeVersion("0", "94", "7"); http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java index 2bfc5fb..ca7ff2c 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/optimize/QueryOptimizer.java @@ -227,7 +227,8 @@ public class QueryOptimizer { // We will or will not do tuple projection according to the data plan. boolean isProjected = dataPlan.getContext().getResolver().getTables().get(0).getTable().getType() == PTableType.PROJECTED; // Check index state of now potentially updated index table to make sure it's active - if (PIndexState.ACTIVE.equals(resolver.getTables().get(0).getTable().getIndexState())) { + PIndexState indexState = resolver.getTables().get(0).getTable().getIndexState(); + if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE) { try { // translate nodes that match expressions that are indexed to the associated column parse node indexSelect = ParseNodeRewriter.rewrite(indexSelect, new IndexExpressionParseNodeRewriter(index, null, statement.getConnection(), indexSelect.getUdfParseNodes())); @@ -243,9 +244,10 @@ public class QueryOptimizer { && !plan.getContext().getDataColumns().isEmpty()) { return null; } + indexState = plan.getTableRef().getTable().getIndexState(); // Checking number of columns handles the wildcard cases correctly, as in that case the index // must contain all columns from the data table to be able to be used. - if (plan.getTableRef().getTable().getIndexState() == PIndexState.ACTIVE) { + if (indexState == PIndexState.ACTIVE || indexState == PIndexState.PENDING_ACTIVE) { if (plan.getProjector().getColumnCount() == nColumns) { return plan; } else if (index.getIndexType() == IndexType.GLOBAL) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 4e0d4cf..94109aa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -41,6 +41,7 @@ import static org.apache.phoenix.query.QueryServices.HBASE_CLIENT_SCANNER_TIMEOU import static org.apache.phoenix.query.QueryServices.IMMUTABLE_ROWS_ATTRIB; import static org.apache.phoenix.query.QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB; import static org.apache.phoenix.query.QueryServices.INDEX_POPULATION_SLEEP_TIME; +import static org.apache.phoenix.query.QueryServices.INDEX_REBUILD_TASK_INITIAL_DELAY; import static org.apache.phoenix.query.QueryServices.IS_NAMESPACE_MAPPING_ENABLED; import static org.apache.phoenix.query.QueryServices.IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE; import static org.apache.phoenix.query.QueryServices.KEEP_ALIVE_MS_ATTRIB; @@ -57,6 +58,11 @@ import static org.apache.phoenix.query.QueryServices.MAX_TENANT_MEMORY_PERC_ATTR import static org.apache.phoenix.query.QueryServices.MIN_STATS_UPDATE_FREQ_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.NUM_RETRIES_FOR_SCHEMA_UPDATE_CHECK; +import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_CLUSTER_BASE_PATH; +import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_LOADBALANCER_ENABLED; +import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_SERVICE_NAME; +import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD; +import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_ZK_ACL_USERNAME; import static org.apache.phoenix.query.QueryServices.QUEUE_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.REGIONSERVER_INFO_PORT_ATTRIB; import static org.apache.phoenix.query.QueryServices.RENEW_LEASE_ENABLED; @@ -79,11 +85,6 @@ import static org.apache.phoenix.query.QueryServices.STATS_USE_CURRENT_TIME_ATTR import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB; import static org.apache.phoenix.query.QueryServices.TRACING_BATCH_SIZE; -import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_SERVICE_NAME; -import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_LOADBALANCER_ENABLED; -import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_CLUSTER_BASE_PATH; -import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD; -import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_ZK_ACL_USERNAME; import static org.apache.phoenix.query.QueryServices.TRACING_ENABLED; import static org.apache.phoenix.query.QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB; import static org.apache.phoenix.query.QueryServices.TRACING_THREAD_POOL_SIZE; @@ -782,4 +783,9 @@ public class QueryServicesOptions { return this; } + public QueryServicesOptions setIndexRebuildTaskInitialDelay(long waitTime) { + config.setLong(INDEX_REBUILD_TASK_INITIAL_DELAY, waitTime); + return this; + } + } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java index cf9db4c..d7dbeca 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/PIndexState.java @@ -26,7 +26,8 @@ public enum PIndexState { ACTIVE("a"), INACTIVE("i"), DISABLE("x"), - REBUILD("r"); + REBUILD("r"), + PENDING_ACTIVE("p"); private final String serializedValue; private final byte[] serializedBytes; http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java index ea6fb96..1b6f9d5 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/IndexUtil.java @@ -17,6 +17,9 @@ */ package org.apache.phoenix.util; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MAJOR_VERSION; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_MINOR_VERSION; +import static org.apache.phoenix.coprocessor.MetaDataProtocol.PHOENIX_PATCH_NUMBER; import static org.apache.phoenix.query.QueryConstants.LOCAL_INDEX_COLUMN_FAMILY_PREFIX; import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_FAMILY; import static org.apache.phoenix.query.QueryConstants.VALUE_COLUMN_QUALIFIER; @@ -81,6 +84,7 @@ import org.apache.phoenix.hbase.index.ValueGetter; import org.apache.phoenix.hbase.index.covered.update.ColumnReference; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.hbase.index.util.VersionUtil; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData; @@ -741,6 +745,7 @@ public class IndexUtil { MutationProto mp = ProtobufUtil.toProto(m); builder.addTableMetadataMutations(mp.toByteString()); } + builder.setClientVersion(VersionUtil.encodeVersion(PHOENIX_MAJOR_VERSION, PHOENIX_MINOR_VERSION, PHOENIX_PATCH_NUMBER)); instance.updateIndexState(controller, builder.build(), rpcCallback); if (controller.getFailedOn() != null) { throw controller.getFailedOn(); } return rpcCallback.get(); @@ -793,7 +798,12 @@ public class IndexUtil { } public static void updateIndexState(PhoenixConnection conn, String indexTableName, - PIndexState newState, Long indexDisableTimestamp) throws SQLException { + PIndexState newState, Long indexDisableTimestamp) throws SQLException { + updateIndexState(conn, indexTableName, newState, indexDisableTimestamp, HConstants.LATEST_TIMESTAMP); + } + + public static void updateIndexState(PhoenixConnection conn, String indexTableName, + PIndexState newState, Long indexDisableTimestamp, Long expectedMaxTimestamp) throws SQLException { byte[] indexTableKey = SchemaUtil.getTableKeyFromFullName(indexTableName); String schemaName = SchemaUtil.getSchemaNameFromFullName(indexTableName); String indexName = SchemaUtil.getTableNameFromFullName(indexTableName); @@ -801,10 +811,12 @@ public class IndexUtil { // index state Put put = new Put(indexTableKey); put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_STATE_BYTES, + expectedMaxTimestamp, newState.getSerializedBytes()); if (indexDisableTimestamp != null) { put.addColumn(PhoenixDatabaseMetaData.TABLE_FAMILY_BYTES, PhoenixDatabaseMetaData.INDEX_DISABLE_TIMESTAMP_BYTES, + expectedMaxTimestamp, PLong.INSTANCE.toBytes(indexDisableTimestamp)); } if (newState == PIndexState.ACTIVE) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java index 0b69206..3c73495 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java @@ -38,7 +38,6 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { private static final int DEFAULT_MAX_MEMORY_PERC = 30; // 30% of heap private static final int DEFAULT_THREAD_TIMEOUT_MS = 60000*5; //5min private static final int DEFAULT_SPOOL_THRESHOLD_BYTES = 1024 * 1024; // 1m - private static final int DEFAULT_MAX_MEMORY_WAIT_MS = 0; private static final int DEFAULT_MAX_TENANT_MEMORY_PERC = 100; private static final int DEFAULT_MAX_SERVER_CACHE_TIME_TO_LIVE_MS = 60000 * 60; // 1HR (to prevent age-out of hash cache during debugging) private static final long DEFAULT_MAX_HASH_CACHE_SIZE = 1024*1024*10; // 10 Mb @@ -64,6 +63,12 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { public static final int DEFAULT_HTABLE_MAX_THREADS = 10; public static final long DEFAULT_INDEX_POPULATION_WAIT_TIME = 0; public static final boolean DEFAULT_TRANSACTIONS_ENABLED = true; + /* + * Effectively disable running the index rebuild task by having an infinite delay + * because we want to control it's execution ourselves + */ + public static final long DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY = Long.MAX_VALUE; + /** * Set number of salt buckets lower for sequence table during testing, as a high @@ -109,7 +114,8 @@ public final class QueryServicesTestImpl extends BaseQueryServicesImpl { .setHConnectionPoolCoreSize(DEFAULT_HCONNECTION_POOL_CORE_SIZE) .setHConnectionPoolMaxSize(DEFAULT_HCONNECTION_POOL_MAX_SIZE) .setMaxThreadsPerHTable(DEFAULT_HTABLE_MAX_THREADS) - .setDefaultIndexPopulationWaitTime(DEFAULT_INDEX_POPULATION_WAIT_TIME); + .setDefaultIndexPopulationWaitTime(DEFAULT_INDEX_POPULATION_WAIT_TIME) + .setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY); } public QueryServicesTestImpl(ReadOnlyProps defaultProps, ReadOnlyProps overrideProps) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f480b29f/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java index 0129eda..45fd52c 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java @@ -75,7 +75,6 @@ import org.apache.phoenix.compile.StatementContext; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheRequest; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.ClearCacheResponse; import org.apache.phoenix.coprocessor.generated.MetaDataProtos.MetaDataService; -import org.apache.phoenix.end2end.index.PartialIndexRebuilderIT.WriteFailingRegionObserver; import org.apache.phoenix.execute.MutationState; import org.apache.phoenix.expression.AndExpression; import org.apache.phoenix.expression.ByteBasedLikeExpression; @@ -888,10 +887,10 @@ public class TestUtil { public static void waitForIndexRebuild(Connection conn, String fullIndexName, PIndexState indexState) throws InterruptedException, SQLException { waitForIndexState(conn, fullIndexName, indexState, 0L); } - + private enum IndexStateCheck {SUCCESS, FAIL, KEEP_TRYING}; public static void waitForIndexState(Connection conn, String fullIndexName, PIndexState expectedIndexState, Long expectedIndexDisableTimestamp) throws InterruptedException, SQLException { - int maxTries = 300, nTries = 0; + int maxTries = 60, nTries = 0; do { Thread.sleep(1000); // sleep 1 sec IndexStateCheck state = checkIndexStateInternal(conn, fullIndexName, expectedIndexState, expectedIndexDisableTimestamp); @@ -918,12 +917,9 @@ public class TestUtil { ResultSet rs = conn.createStatement().executeQuery(query); if (rs.next()) { Long actualIndexDisableTimestamp = rs.getLong(1); - if (rs.wasNull()) { - actualIndexDisableTimestamp = null; - } PIndexState actualIndexState = PIndexState.fromSerializedValue(rs.getString(2)); - boolean matchesExpected = Objects.equal(actualIndexDisableTimestamp, expectedIndexDisableTimestamp) && - actualIndexState == expectedIndexState; + boolean matchesExpected = (expectedIndexDisableTimestamp == null || Objects.equal(actualIndexDisableTimestamp, expectedIndexDisableTimestamp)) + && actualIndexState == expectedIndexState; if (matchesExpected) { return IndexStateCheck.SUCCESS; } @@ -949,6 +945,7 @@ public class TestUtil { }else{ return; } + final int retries = 10; int numTries = 10; try (HBaseAdmin admin = services.getAdmin()) { admin.modifyTable(Bytes.toBytes(tableName), descriptor); @@ -957,13 +954,38 @@ public class TestUtil { numTries--; if (numTries == 0) { throw new Exception( - "Check to detect if delaying co-processor was added failed after " - + numTries + " retries."); + "Failed to add " + coprocessorClass.getName() + " after " + + retries + " retries."); } Thread.sleep(1000); } } } + public static void removeCoprocessor(Connection conn, String tableName, Class coprocessorClass) throws Exception { + ConnectionQueryServices services = conn.unwrap(PhoenixConnection.class).getQueryServices(); + HTableDescriptor descriptor = services.getTableDescriptor(Bytes.toBytes(tableName)); + if (descriptor.getCoprocessors().contains(coprocessorClass.getName())) { + descriptor.removeCoprocessor(coprocessorClass.getName()); + }else{ + return; + } + final int retries = 10; + int numTries = retries; + try (HBaseAdmin admin = services.getAdmin()) { + admin.modifyTable(Bytes.toBytes(tableName), descriptor); + while (!admin.getTableDescriptor(Bytes.toBytes(tableName)).equals(descriptor) + && numTries > 0) { + numTries--; + if (numTries == 0) { + throw new Exception( + "Failed to remove " + coprocessorClass.getName() + " after " + + retries + " retries."); + } + Thread.sleep(1000); + } + } + } + }
