This is an automated email from the ASF dual-hosted git repository.
kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 27ef302 PHOENIX-5535 Index rebuilds via
UngroupedAggregateRegionObserver should replay delete markers
27ef302 is described below
commit 27ef3024cd3cd3bcf521cd932deb66166969f321
Author: Kadir <[email protected]>
AuthorDate: Sun Oct 20 01:02:22 2019 -0700
PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should
replay delete markers
---
.../org/apache/phoenix/end2end/IndexToolIT.java | 72 +++++-
.../apache/phoenix/compile/PostDDLCompiler.java | 253 ++++++++++++---------
.../phoenix/compile/ServerBuildIndexCompiler.java | 109 +++++----
.../apache/phoenix/index/GlobalIndexChecker.java | 8 +-
.../PhoenixServerBuildIndexInputFormat.java | 10 +-
.../org/apache/phoenix/schema/MetaDataClient.java | 9 +-
6 files changed, 278 insertions(+), 183 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index a4b7bdc..9827140 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -29,6 +29,7 @@ import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.sql.Types;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -39,6 +40,7 @@ import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.PartialIndexRebuilderIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.index.IndexTool;
import org.apache.phoenix.query.ConnectionQueryServices;
@@ -74,10 +77,12 @@ import org.junit.runners.Parameterized.Parameters;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@RunWith(Parameterized.class)
public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
-
+ private static final Logger LOGGER =
LoggerFactory.getLogger(PartialIndexRebuilderIT.class);
private final boolean localIndex;
private final boolean mutable;
private final boolean transactional;
@@ -251,6 +256,71 @@ public class IndexToolIT extends
BaseUniqueNamesOwnClusterIT {
}
}
+ private void setEveryNthRowWithNull(int nrows, int nthRowNull,
PreparedStatement stmt) throws Exception {
+ for (int i = 0; i < nrows; i++) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, i * 10);
+ if (i % nthRowNull != 0) {
+ stmt.setInt(3, 9000 + i * nthRowNull);
+ } else {
+ stmt.setNull(3, Types.INTEGER);
+ }
+ stmt.execute();
+ }
+ }
+
+ @Test
+ public void testWithSetNull() throws Exception {
+ // This test is for building non-transactional mutable global indexes
with direct api
+ if (localIndex || transactional || !mutable) {
+ return;
+ }
+ // This tests the cases where a column having a null value is
overwritten with a not null value and vice versa;
+ // and after that the index table is still rebuilt correctly
+ final int NROWS = 2 * 3 * 5 * 7;
+ String schemaName = generateUniqueName();
+ String dataTableName = generateUniqueName();
+ String dataTableFullName = SchemaUtil.getTableName(schemaName,
dataTableName);
+ String indexTableName = generateUniqueName();
+ String indexTableFullName = SchemaUtil.getTableName(schemaName,
indexTableName);
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String stmString1 =
+ "CREATE TABLE " + dataTableFullName
+ + " (ID INTEGER NOT NULL PRIMARY KEY, VAL INTEGER,
ZIP INTEGER) "
+ + tableDDLOptions;
+ conn.createStatement().execute(stmString1);
+ String upsertStmt = "UPSERT INTO " + dataTableFullName + "
VALUES(?,?,?)";
+ PreparedStatement stmt = conn.prepareStatement(upsertStmt);
+ setEveryNthRowWithNull(NROWS, 2, stmt);
+ conn.commit();
+ setEveryNthRowWithNull(NROWS, 3, stmt);
+ conn.commit();
+ String stmtString2 =
+ String.format(
+ "CREATE %s INDEX %s ON %s (VAL) INCLUDE (ZIP)
ASYNC ",
+ (localIndex ? "LOCAL" : ""), indexTableName,
dataTableFullName);
+ conn.createStatement().execute(stmtString2);
+ // Run the index MR job and verify that the index table is built
correctly
+ IndexTool indexTool = runIndexTool(directApi, useSnapshot,
schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+ assertEquals(NROWS,
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn,
dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+
+ // Repeat the test with compaction
+ setEveryNthRowWithNull(NROWS, 5, stmt);
+ conn.commit();
+ setEveryNthRowWithNull(NROWS, 7, stmt);
+ conn.commit();
+ TestUtil.doMajorCompaction(conn, dataTableFullName);
+ // Run the index MR job and verify that the index table is built
correctly
+ indexTool = runIndexTool(directApi, useSnapshot, schemaName,
dataTableName, indexTableName, null, 0, new String[0]);
+ assertEquals(NROWS,
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+ actualRowCount = IndexScrutiny.scrutinizeIndex(conn,
dataTableFullName, indexTableFullName);
+ assertEquals(NROWS, actualRowCount);
+ }
+ }
+
@Test
public void testBuildSecondaryIndexAndScrutinize() throws Exception {
// This test is for building non-transactional global indexes with
direct api
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index a74c5f1..04a3188 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -72,6 +72,7 @@ import com.google.common.collect.Lists;
public class PostDDLCompiler {
private final PhoenixConnection connection;
private final Scan scan;
+ private PostDDLMutationPlan mutationPlan = null;
public PostDDLCompiler(PhoenixConnection connection) {
this(connection, new Scan());
@@ -91,7 +92,12 @@ public class PostDDLCompiler {
new MultipleTableRefColumnResolver(tableRefs),
scan,
new SequenceManager(statement));
- return new PostDDLMutationPlan(context, tableRefs, timestamp, emptyCF,
deleteList, projectCFs);
+ this.mutationPlan = new PostDDLMutationPlan(context, tableRefs,
timestamp, emptyCF, deleteList, projectCFs);
+ return this.mutationPlan;
+ }
+
+ public QueryPlan getQueryPlan(TableRef tableRef) throws SQLException {
+ return mutationPlan.getQueryPlan(tableRef);
}
private static class MultipleTableRefColumnResolver implements
ColumnResolver {
@@ -165,11 +171,11 @@ public class PostDDLCompiler {
this.projectCFs = projectCFs;
}
- @Override
- public MutationState execute() throws SQLException {
+ public QueryPlan getQueryPlan(final TableRef tableRef) throws
SQLException {
if (tableRefs.isEmpty()) {
- return new MutationState(0, 1000, connection);
+ return null;
}
+ QueryPlan plan = null;
boolean wasAutoCommit = connection.getAutoCommit();
try {
connection.setAutoCommit(true);
@@ -181,127 +187,152 @@ public class PostDDLCompiler {
* 3) updating the necessary rows to have an empty KV
* 4) updating table stats
*/
- long totalMutationCount = 0;
- for (final TableRef tableRef : tableRefs) {
- Scan scan = ScanUtil.newScan(context.getScan());
- SelectStatement select = SelectStatement.COUNT_ONE;
- // We need to use this tableRef
- ColumnResolver resolver = new
SingleTableRefColumnResolver(tableRef);
- PhoenixStatement statement = new
PhoenixStatement(connection);
- StatementContext context = new StatementContext(statement,
resolver, scan, new SequenceManager(statement));
- long ts = timestamp;
- // FIXME: DDL operations aren't transactional, so we're
basing the timestamp on a server timestamp.
- // Not sure what the fix should be. We don't need conflict
detection nor filtering of invalid transactions
- // in this case, so maybe this is ok.
- if (ts!= HConstants.LATEST_TIMESTAMP &&
tableRef.getTable().isTransactional()) {
- ts = TransactionUtil.convertToNanoseconds(ts);
- }
- ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(),
ts);
- if (emptyCF != null) {
- scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF,
emptyCF);
-
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER,
EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
- }
- ServerCache cache = null;
- try {
- if (deleteList != null) {
- if (deleteList.isEmpty()) {
-
scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
- // In the case of a row deletion, add index
metadata so mutable secondary indexing works
- /* TODO: we currently manually run a scan to
delete the index data here
- ImmutableBytesWritable ptr =
context.getTempPtr();
- tableRef.getTable().getIndexMaintainers(ptr);
- if (ptr.getLength() > 0) {
- IndexMetaDataCacheClient client = new
IndexMetaDataCacheClient(connection, tableRef);
- cache =
client.addIndexMetadataCache(context.getScanRanges(), ptr);
- byte[] uuidValue = cache.getId();
-
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
- }
- */
- } else {
- // In the case of the empty key value column
family changing, do not send the index
- // metadata, as we're currently managing this
from the client. It's possible for the
- // data empty column family to stay the same,
while the index empty column family
- // changes.
- PColumn column = deleteList.get(0);
- byte[] cq = column.getColumnQualifierBytes();
- if (emptyCF == null) {
-
scan.addColumn(column.getFamilyName().getBytes(), cq);
- }
-
scan.setAttribute(BaseScannerRegionObserver.DELETE_CF,
column.getFamilyName().getBytes());
-
scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
- }
- }
- List<byte[]> columnFamilies =
Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
- if (projectCFs == null) {
- for (PColumnFamily family :
tableRef.getTable().getColumnFamilies()) {
-
columnFamilies.add(family.getName().getBytes());
+
+ Scan scan = ScanUtil.newScan(context.getScan());
+ SelectStatement select = SelectStatement.COUNT_ONE;
+ // We need to use this tableRef
+ ColumnResolver resolver = new
SingleTableRefColumnResolver(tableRef);
+ PhoenixStatement statement = new PhoenixStatement(connection);
+ StatementContext context = new StatementContext(statement,
resolver, scan, new SequenceManager(statement));
+ long ts = timestamp;
+ // FIXME: DDL operations aren't transactional, so we're basing
the timestamp on a server timestamp.
+ // Not sure what the fix should be. We don't need conflict
detection nor filtering of invalid transactions
+ // in this case, so maybe this is ok.
+ if (ts!= HConstants.LATEST_TIMESTAMP &&
tableRef.getTable().isTransactional()) {
+ ts = TransactionUtil.convertToNanoseconds(ts);
+ }
+ ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
+ if (emptyCF != null) {
+ scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF,
emptyCF);
+
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER,
EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
+ }
+ ServerCache cache = null;
+ try {
+ if (deleteList != null) {
+ if (deleteList.isEmpty()) {
+
scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
+ // In the case of a row deletion, add index
metadata so mutable secondary indexing works
+ /* TODO: we currently manually run a scan to
delete the index data here
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ tableRef.getTable().getIndexMaintainers(ptr);
+ if (ptr.getLength() > 0) {
+ IndexMetaDataCacheClient client = new
IndexMetaDataCacheClient(connection, tableRef);
+ cache =
client.addIndexMetadataCache(context.getScanRanges(), ptr);
+ byte[] uuidValue = cache.getId();
+
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
}
+ */
} else {
- for (byte[] projectCF : projectCFs) {
- columnFamilies.add(projectCF);
+ // In the case of the empty key value column
family changing, do not send the index
+ // metadata, as we're currently managing this from
the client. It's possible for the
+ // data empty column family to stay the same,
while the index empty column family
+ // changes.
+ PColumn column = deleteList.get(0);
+ byte[] cq = column.getColumnQualifierBytes();
+ if (emptyCF == null) {
+
scan.addColumn(column.getFamilyName().getBytes(), cq);
}
+
scan.setAttribute(BaseScannerRegionObserver.DELETE_CF,
column.getFamilyName().getBytes());
+
scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
}
- // Need to project all column families into the scan,
since we haven't yet created our empty key value
- RowProjector projector =
ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE,
GroupBy.EMPTY_GROUP_BY);
- context.getAggregationManager().compile(context,
GroupBy.EMPTY_GROUP_BY);
- // Explicitly project these column families and don't
project the empty key value,
- // since at this point we haven't added the empty key
value everywhere.
- if (columnFamilies != null) {
- scan.getFamilyMap().clear();
- for (byte[] family : columnFamilies) {
- scan.addFamily(family);
- }
- projector = new RowProjector(projector,false);
+ }
+ List<byte[]> columnFamilies =
Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
+ if (projectCFs == null) {
+ for (PColumnFamily family :
tableRef.getTable().getColumnFamilies()) {
+ columnFamilies.add(family.getName().getBytes());
}
- // Ignore exceptions due to not being able to resolve
any view columns,
- // as this just means the view is invalid. Continue on
and try to perform
- // any other Post DDL operations.
- try {
- // Since dropping a VIEW does not affect the
underlying data, we do
- // not need to pass through the view statement
here.
- WhereCompiler.compile(context, select); // Push
where clause into scan
- } catch (ColumnFamilyNotFoundException e) {
- continue;
- } catch (ColumnNotFoundException e) {
- continue;
- } catch (AmbiguousColumnException e) {
- continue;
+ } else {
+ for (byte[] projectCF : projectCFs) {
+ columnFamilies.add(projectCF);
}
- QueryPlan plan = new AggregatePlan(context, select,
tableRef, projector, null, null,
- OrderBy.EMPTY_ORDER_BY, null,
GroupBy.EMPTY_GROUP_BY, null, null);
+ }
+ // Need to project all column families into the scan,
since we haven't yet created our empty key value
+ RowProjector projector =
ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE,
GroupBy.EMPTY_GROUP_BY);
+ context.getAggregationManager().compile(context,
GroupBy.EMPTY_GROUP_BY);
+ // Explicitly project these column families and don't
project the empty key value,
+ // since at this point we haven't added the empty key
value everywhere.
+ if (columnFamilies != null) {
+ scan.getFamilyMap().clear();
+ for (byte[] family : columnFamilies) {
+ scan.addFamily(family);
+ }
+ projector = new RowProjector(projector,false);
+ }
+ // Ignore exceptions due to not being able to resolve any
view columns,
+ // as this just means the view is invalid. Continue on and
try to perform
+ // any other Post DDL operations.
+ try {
+ // Since dropping a VIEW does not affect the
underlying data, we do
+ // not need to pass through the view statement here.
+ WhereCompiler.compile(context, select); // Push where
clause into scan
+ } catch (ColumnFamilyNotFoundException e) {
+ return null;
+ } catch (ColumnNotFoundException e) {
+ return null;
+ } catch (AmbiguousColumnException e) {
+ return null;
+ }
+ plan = new AggregatePlan(context, select, tableRef,
projector, null, null,
+ OrderBy.EMPTY_ORDER_BY, null,
GroupBy.EMPTY_GROUP_BY, null, null);
+ } finally {
+ if (cache != null) { // Remove server cache if there is one
+ cache.close();
+ }
+ }
+ } finally {
+ if (!wasAutoCommit) connection.setAutoCommit(wasAutoCommit);
+ }
+ return plan;
+ }
+
+ @Override
+ public MutationState execute() throws SQLException {
+ if (tableRefs.isEmpty()) {
+ return new MutationState(0, 1000, connection);
+ }
+ boolean wasAutoCommit = connection.getAutoCommit();
+ try {
+ connection.setAutoCommit(true);
+ SQLException sqlE = null;
+ /*
+ * Handles:
+ * 1) deletion of all rows for a DROP TABLE and subsequently
deletion of all rows for a DROP INDEX;
+ * 2) deletion of all column values for a ALTER TABLE DROP
COLUMN
+ * 3) updating the necessary rows to have an empty KV
+ * 4) updating table stats
+ */
+ long totalMutationCount = 0;
+ for (final TableRef tableRef : tableRefs) {
+ QueryPlan plan = getQueryPlan(tableRef);
+ if (plan == null)
+ continue;
+ try {
+ ResultIterator iterator = plan.iterator();
try {
- ResultIterator iterator = plan.iterator();
+ Tuple row = iterator.next();
+ ImmutableBytesWritable ptr = context.getTempPtr();
+ totalMutationCount +=
(Long)plan.getProjector().getColumnProjector(0).getValue(row, PLong.INSTANCE,
ptr);
+ } catch (SQLException e) {
+ sqlE = e;
+ } finally {
try {
- Tuple row = iterator.next();
- ImmutableBytesWritable ptr =
context.getTempPtr();
- totalMutationCount +=
(Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+ iterator.close();
} catch (SQLException e) {
- sqlE = e;
+ if (sqlE == null) {
+ sqlE = e;
+ } else {
+ sqlE.setNextException(e);
+ }
} finally {
- try {
- iterator.close();
- } catch (SQLException e) {
- if (sqlE == null) {
- sqlE = e;
- } else {
- sqlE.setNextException(e);
- }
- } finally {
- if (sqlE != null) {
- throw sqlE;
- }
+ if (sqlE != null) {
+ throw sqlE;
}
}
- } catch (TableNotFoundException e) {
- // Ignore and continue, as HBase throws when table
hasn't been written to
- // FIXME: Remove if this is fixed in 0.96
- }
- } finally {
- if (cache != null) { // Remove server cache if there
is one
- cache.close();
}
+ } catch (TableNotFoundException e) {
+ // Ignore and continue, as HBase throws when table
hasn't been written to
+ // FIXME: Remove if this is fixed in 0.96
}
-
}
final long count = totalMutationCount;
return new MutationState(1, 1000, connection) {
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 7d1c1b4..40cea2c 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -19,15 +19,16 @@ package org.apache.phoenix.compile;
import java.sql.SQLException;
import java.util.Collections;
-import java.util.List;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
import org.apache.phoenix.coprocessor.MetaDataProtocol;
import org.apache.phoenix.execute.MutationState;
import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.index.PhoenixIndexCodec;
import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -36,24 +37,22 @@ import org.apache.phoenix.schema.*;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PLong;
import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.StringUtil;
-
-import com.google.common.collect.Lists;
import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
/**
- * Class that compiles plan to generate initial data values after a DDL
command for
+ * Class that compiles queryPlan to generate initial data values after a DDL
command for
* index table.
*/
public class ServerBuildIndexCompiler {
private final PhoenixConnection connection;
- private final String tableName;
+ private final String indexTableFullName;
+ private final String dataTableFullName;
private PTable dataTable;
- private QueryPlan plan;
+ private QueryPlan queryPlan;
private class RowCountMutationPlan extends BaseMutationPlan {
private RowCountMutationPlan(StatementContext context,
PhoenixStatement.Operation operation) {
@@ -62,7 +61,7 @@ public class ServerBuildIndexCompiler {
@Override
public MutationState execute() throws SQLException {
connection.getMutationState().commitDDLFence(dataTable);
- Tuple tuple = plan.iterator().next();
+ Tuple tuple = queryPlan.iterator().next();
long rowCount = 0;
if (tuple != null) {
Cell kv = tuple.getValue(0);
@@ -78,61 +77,57 @@ public class ServerBuildIndexCompiler {
@Override
public QueryPlan getQueryPlan() {
- return plan;
+ return queryPlan;
}
};
- public ServerBuildIndexCompiler(PhoenixConnection connection, String
tableName) {
+ public ServerBuildIndexCompiler(PhoenixConnection connection, String
dataTableFullName, String indexTableFullName) {
this.connection = connection;
- this.tableName = tableName;
+ this.dataTableFullName = dataTableFullName;
+ this.indexTableFullName = indexTableFullName;
}
- public MutationPlan compile(PTable index) throws SQLException {
- try (final PhoenixStatement statement = new
PhoenixStatement(connection)) {
- String query = "SELECT count(*) FROM " + tableName;
- this.plan = statement.compileQuery(query);
- TableRef tableRef = plan.getTableRef();
- Scan scan = plan.getContext().getScan();
- ImmutableBytesWritable ptr = new ImmutableBytesWritable();
- dataTable = tableRef.getTable();
- if (index.getIndexType() == PTable.IndexType.GLOBAL &&
dataTable.isTransactional()) {
- throw new IllegalArgumentException(
- "ServerBuildIndexCompiler does not support global
indexes on transactional tables");
- }
- IndexMaintainer.serialize(dataTable, ptr,
Collections.singletonList(index), plan.getContext().getConnection());
- // Set the scan attributes that UngroupedAggregateRegionObserver
will switch on.
- // For local indexes, the
BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
- // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute
is set to the serialized form of index
- // metadata to build index rows from data table rows. For global
indexes, we also need to set (1) the
- // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to
signal UngroupedAggregateRegionObserver
- // that this scan is for building global indexes and (2) the
MetaDataProtocol.PHOENIX_VERSION attribute
- // that will be passed as a mutation attribute for the scanned
mutations that will be applied on
- // the index table possibly remotely
- if (index.getIndexType() == PTable.IndexType.LOCAL) {
-
scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO,
ByteUtil.copyKeyBytesIfNecessary(ptr));
- } else {
- scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD,
ByteUtil.copyKeyBytesIfNecessary(ptr));
- scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES,
TRUE_BYTES);
- ScanUtil.setClientVersion(scan,
MetaDataProtocol.PHOENIX_VERSION);
- }
- // By default, we'd use a FirstKeyOnly filter as nothing else
needs to be projected for count(*).
- // However, in this case, we need to project all of the data
columns that contribute to the index.
- IndexMaintainer indexMaintainer =
index.getIndexMaintainer(dataTable, connection);
- for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
- if (index.getImmutableStorageScheme() ==
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
- scan.addFamily(columnRef.getFamily());
- } else {
- scan.addColumn(columnRef.getFamily(),
columnRef.getQualifier());
- }
- }
-
- if (dataTable.isTransactional()) {
- scan.setAttribute(BaseScannerRegionObserver.TX_STATE,
connection.getMutationState().encodeTransaction());
- }
+ public MutationPlan compile() throws SQLException {
+ PTable index = PhoenixRuntime.getTable(connection, indexTableFullName);
+ dataTable = PhoenixRuntime.getTable(connection, dataTableFullName);
+ if (index.getIndexType() == PTable.IndexType.GLOBAL &&
dataTable.isTransactional()) {
+ throw new IllegalArgumentException(
+ "ServerBuildIndexCompiler does not support global indexes
on transactional tables");
+ }
+ PostDDLCompiler compiler = new PostDDLCompiler(connection);
+ TableRef dataTableRef = new TableRef(dataTable);
+ compiler.compile(Collections.singletonList(dataTableRef),
+ null, null, null, HConstants.LATEST_TIMESTAMP);
+ queryPlan = compiler.getQueryPlan(dataTableRef);
+ Scan dataTableScan =
IndexManagementUtil.newLocalStateScan(queryPlan.getContext().getScan(),
+ Collections.singletonList(index.getIndexMaintainer(dataTable,
connection)));
+ ImmutableBytesWritable indexMetaDataPtr = new
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+ IndexMaintainer.serializeAdditional(dataTable, indexMetaDataPtr,
Collections.singletonList(index),
+ connection);
+ byte[] attribValue =
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
- // Go through MutationPlan abstraction so that we can create local
indexes
- // with a connectionless connection (which makes testing easier).
- return new RowCountMutationPlan(plan.getContext(),
PhoenixStatement.Operation.UPSERT);
+ // Set the scan attributes that UngroupedAggregateRegionObserver will
switch on.
+ // For local indexes, the
BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
+ // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute is
set to the serialized form of index
+ // metadata to build index rows from data table rows. For global
indexes, we also need to set (1) the
+ // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to
signal UngroupedAggregateRegionObserver
+ // that this scan is for building global indexes and (2) the
MetaDataProtocol.PHOENIX_VERSION attribute
+ // that will be passed as a mutation attribute for the scanned
mutations that will be applied on
+ // the index table possibly remotely
+ ScanUtil.setClientVersion(dataTableScan,
MetaDataProtocol.PHOENIX_VERSION);
+ if (index.getIndexType() == PTable.IndexType.LOCAL) {
+
dataTableScan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO,
attribValue);
+ } else {
+ dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD,
attribValue);
+
dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES,
TRUE_BYTES);
+ ScanUtil.setClientVersion(dataTableScan,
MetaDataProtocol.PHOENIX_VERSION);
}
+ if (dataTable.isTransactional()) {
+ dataTableScan.setAttribute(BaseScannerRegionObserver.TX_STATE,
connection.getMutationState().encodeTransaction());
+ }
+
+ // Go through MutationPlan abstraction so that we can create local
indexes
+ // with a connectionless connection (which makes testing easier).
+ return new RowCountMutationPlan(queryPlan.getContext(),
PhoenixStatement.Operation.UPSERT);
}
}
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index dd95c8e..f5aaf42 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -299,6 +299,11 @@ public class GlobalIndexChecker implements
RegionCoprocessor, RegionObserver {
buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD,
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES,
TRUE_BYTES);
buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK,
Bytes.toBytes(true));
+ // We want delete markers to be replayed during index rebuild.
+ buildIndexScan.setRaw(true);
+ buildIndexScan.setCacheBlocks(false);
+ buildIndexScan.readAllVersions();
+ buildIndexScan.setTimeRange(0, maxTimestamp);
}
// Rebuild the index row from the corresponding the row in the the
data table
// Get the data row key from the index row key
@@ -306,9 +311,6 @@ public class GlobalIndexChecker implements
RegionCoprocessor, RegionObserver {
buildIndexScan.withStartRow(dataRowKey, true);
buildIndexScan.withStopRow(dataRowKey, true);
buildIndexScan.setTimeRange(0, maxTimestamp);
- // If the data table row has been deleted then we want to delete
the corresponding index row too.
- // Thus, we are using a raw scan
- buildIndexScan.setRaw(true);
try (ResultScanner resultScanner =
dataHTable.getScanner(buildIndexScan)){
resultScanner.next();
} catch (Throwable t) {
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 1abcef4..5128c26 100644
---
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -82,18 +82,16 @@ public class PhoenixServerBuildIndexInputFormat<T extends
DBWritable> extends Ph
try (final Connection connection =
ConnectionUtil.getInputConnection(configuration, overridingProps)) {
PhoenixConnection phoenixConnection =
connection.unwrap(PhoenixConnection.class);
Long scn = (currentScnValue != null) ?
Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
- PTable indexTable =
PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
ServerBuildIndexCompiler compiler =
- new ServerBuildIndexCompiler(phoenixConnection,
dataTableFullName);
- MutationPlan plan = compiler.compile(indexTable);
- Scan scan = plan.getContext().getScan();
-
+ new ServerBuildIndexCompiler(phoenixConnection,
dataTableFullName, indexTableFullName);
+ MutationPlan plan = compiler.compile();
+ queryPlan = plan.getQueryPlan();
+ Scan scan = queryPlan.getContext().getScan();
try {
scan.setTimeRange(0, scn);
} catch (IOException e) {
throw new SQLException(e);
}
- queryPlan = plan.getQueryPlan();
// since we can't set a scn on connections with txn set TX_SCN
attribute so that the max time range is set by BaseScannerRegionObserver
if (txnScnValue != null) {
scan.setAttribute(BaseScannerRegionObserver.TX_SCN,
Bytes.toBytes(Long.valueOf(txnScnValue)));
diff --git
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index b379e14..01d2a83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1302,8 +1302,10 @@ public class MetaDataClient {
PostIndexDDLCompiler compiler = new
PostIndexDDLCompiler(connection, dataTableRef);
return compiler.compile(index);
} else {
- ServerBuildIndexCompiler compiler = new
ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef));
- return compiler.compile(index);
+ ServerBuildIndexCompiler compiler = new
ServerBuildIndexCompiler(connection,
+
SchemaUtil.getTableName(dataTableRef.getTable().getSchemaName().getString(),
dataTableRef.getTable().getTableName().getString()),
+ SchemaUtil.getTableName(index.getSchemaName().getString(),
index.getTableName().getString()));
+ return compiler.compile();
}
}
@@ -1673,9 +1675,6 @@ public class MetaDataClient {
return buildIndexAtTimeStamp(table, statement.getTable());
}
- String dataTableFullName = SchemaUtil.getTableName(
- tableRef.getTable().getSchemaName().getString(),
- tableRef.getTable().getTableName().getString());
return buildIndex(table, tableRef);
}