This is an automated email from the ASF dual-hosted git repository.

kadir pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new 27ef302  PHOENIX-5535 Index rebuilds via 
UngroupedAggregateRegionObserver should replay delete markers
27ef302 is described below

commit 27ef3024cd3cd3bcf521cd932deb66166969f321
Author: Kadir <[email protected]>
AuthorDate: Sun Oct 20 01:02:22 2019 -0700

    PHOENIX-5535 Index rebuilds via UngroupedAggregateRegionObserver should 
replay delete markers
---
 .../org/apache/phoenix/end2end/IndexToolIT.java    |  72 +++++-
 .../apache/phoenix/compile/PostDDLCompiler.java    | 253 ++++++++++++---------
 .../phoenix/compile/ServerBuildIndexCompiler.java  | 109 +++++----
 .../apache/phoenix/index/GlobalIndexChecker.java   |   8 +-
 .../PhoenixServerBuildIndexInputFormat.java        |  10 +-
 .../org/apache/phoenix/schema/MetaDataClient.java  |   9 +-
 6 files changed, 278 insertions(+), 183 deletions(-)

diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
index a4b7bdc..9827140 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/IndexToolIT.java
@@ -29,6 +29,7 @@ import java.sql.DriverManager;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.sql.Types;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -39,6 +40,7 @@ import java.util.UUID;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.*;
@@ -46,6 +48,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.phoenix.end2end.index.PartialIndexRebuilderIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.index.IndexTool;
 import org.apache.phoenix.query.ConnectionQueryServices;
@@ -74,10 +77,12 @@ import org.junit.runners.Parameterized.Parameters;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @RunWith(Parameterized.class)
 public class IndexToolIT extends BaseUniqueNamesOwnClusterIT {
-
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(PartialIndexRebuilderIT.class);
     private final boolean localIndex;
     private final boolean mutable;
     private final boolean transactional;
@@ -251,6 +256,71 @@ public class IndexToolIT extends 
BaseUniqueNamesOwnClusterIT {
         }
     }
 
+    private void setEveryNthRowWithNull(int nrows, int nthRowNull, 
PreparedStatement stmt) throws Exception {
+        for (int i = 0; i < nrows; i++) {
+            stmt.setInt(1, i);
+            stmt.setInt(2, i * 10);
+            if (i % nthRowNull != 0) {
+                stmt.setInt(3, 9000 + i * nthRowNull);
+            } else {
+                stmt.setNull(3, Types.INTEGER);
+            }
+            stmt.execute();
+        }
+    }
+
+    @Test
+    public void testWithSetNull() throws Exception {
+        // This test is for building non-transactional mutable global indexes 
with direct api
+        if (localIndex || transactional || !mutable) {
+            return;
+        }
+        // This tests the cases where a column having a null value is 
overwritten with a not null value and vice versa;
+        // and after that the index table is still rebuilt correctly
+        final int NROWS = 2 * 3 * 5 * 7;
+        String schemaName = generateUniqueName();
+        String dataTableName = generateUniqueName();
+        String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+        String indexTableName = generateUniqueName();
+        String indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+        Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+        try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+            String stmString1 =
+                    "CREATE TABLE " + dataTableFullName
+                            + " (ID INTEGER NOT NULL PRIMARY KEY, VAL INTEGER, 
ZIP INTEGER) "
+                            + tableDDLOptions;
+            conn.createStatement().execute(stmString1);
+            String upsertStmt = "UPSERT INTO " + dataTableFullName + " 
VALUES(?,?,?)";
+            PreparedStatement stmt = conn.prepareStatement(upsertStmt);
+            setEveryNthRowWithNull(NROWS, 2, stmt);
+            conn.commit();
+            setEveryNthRowWithNull(NROWS, 3, stmt);
+            conn.commit();
+            String stmtString2 =
+                    String.format(
+                            "CREATE %s INDEX %s ON %s (VAL) INCLUDE (ZIP) 
ASYNC ",
+                            (localIndex ? "LOCAL" : ""), indexTableName, 
dataTableFullName);
+            conn.createStatement().execute(stmtString2);
+            // Run the index MR job and verify that the index table is built 
correctly
+            IndexTool indexTool = runIndexTool(directApi, useSnapshot, 
schemaName, dataTableName, indexTableName, null, 0, new String[0]);
+            assertEquals(NROWS, 
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+
+            // Repeat the test with compaction
+            setEveryNthRowWithNull(NROWS, 5, stmt);
+            conn.commit();
+            setEveryNthRowWithNull(NROWS, 7, stmt);
+            conn.commit();
+            TestUtil.doMajorCompaction(conn, dataTableFullName);
+            // Run the index MR job and verify that the index table is built 
correctly
+            indexTool = runIndexTool(directApi, useSnapshot, schemaName, 
dataTableName, indexTableName, null, 0, new String[0]);
+            assertEquals(NROWS, 
indexTool.getJob().getCounters().findCounter(INPUT_RECORDS).getValue());
+            actualRowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
+            assertEquals(NROWS, actualRowCount);
+        }
+    }
+
     @Test
     public void testBuildSecondaryIndexAndScrutinize() throws Exception {
         // This test is for building non-transactional global indexes with 
direct api
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
index a74c5f1..04a3188 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/compile/PostDDLCompiler.java
@@ -72,6 +72,7 @@ import com.google.common.collect.Lists;
 public class PostDDLCompiler {
     private final PhoenixConnection connection;
     private final Scan scan;
+    private PostDDLMutationPlan mutationPlan = null;
 
     public PostDDLCompiler(PhoenixConnection connection) {
         this(connection, new Scan());
@@ -91,7 +92,12 @@ public class PostDDLCompiler {
             new MultipleTableRefColumnResolver(tableRefs),
                 scan,
                 new SequenceManager(statement));
-        return new PostDDLMutationPlan(context, tableRefs, timestamp, emptyCF, 
deleteList, projectCFs);
+        this.mutationPlan = new PostDDLMutationPlan(context, tableRefs, 
timestamp, emptyCF, deleteList, projectCFs);
+        return this.mutationPlan;
+    }
+
+    public QueryPlan getQueryPlan(TableRef tableRef) throws SQLException {
+        return mutationPlan.getQueryPlan(tableRef);
     }
 
     private static class MultipleTableRefColumnResolver implements 
ColumnResolver {
@@ -165,11 +171,11 @@ public class PostDDLCompiler {
             this.projectCFs = projectCFs;
         }
 
-        @Override
-        public MutationState execute() throws SQLException {
+        public QueryPlan getQueryPlan(final TableRef tableRef) throws 
SQLException {
             if (tableRefs.isEmpty()) {
-                return new MutationState(0, 1000, connection);
+                return null;
             }
+            QueryPlan plan = null;
             boolean wasAutoCommit = connection.getAutoCommit();
             try {
                 connection.setAutoCommit(true);
@@ -181,127 +187,152 @@ public class PostDDLCompiler {
                  * 3) updating the necessary rows to have an empty KV
                  * 4) updating table stats
                  */
-                long totalMutationCount = 0;
-                for (final TableRef tableRef : tableRefs) {
-                    Scan scan = ScanUtil.newScan(context.getScan());
-                    SelectStatement select = SelectStatement.COUNT_ONE;
-                    // We need to use this tableRef
-                    ColumnResolver resolver = new 
SingleTableRefColumnResolver(tableRef);
-                    PhoenixStatement statement = new 
PhoenixStatement(connection);
-                    StatementContext context = new StatementContext(statement, 
resolver, scan, new SequenceManager(statement));
-                    long ts = timestamp;
-                    // FIXME: DDL operations aren't transactional, so we're 
basing the timestamp on a server timestamp.
-                    // Not sure what the fix should be. We don't need conflict 
detection nor filtering of invalid transactions
-                    // in this case, so maybe this is ok.
-                    if (ts!= HConstants.LATEST_TIMESTAMP && 
tableRef.getTable().isTransactional()) {
-                        ts = TransactionUtil.convertToNanoseconds(ts);
-                    }
-                    ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), 
ts);
-                    if (emptyCF != null) {
-                        scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, 
emptyCF);
-                        
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, 
EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
-                    }
-                    ServerCache cache = null;
-                    try {
-                        if (deleteList != null) {
-                            if (deleteList.isEmpty()) {
-                                
scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
-                                // In the case of a row deletion, add index 
metadata so mutable secondary indexing works
-                                /* TODO: we currently manually run a scan to 
delete the index data here
-                                ImmutableBytesWritable ptr = 
context.getTempPtr();
-                                tableRef.getTable().getIndexMaintainers(ptr);
-                                if (ptr.getLength() > 0) {
-                                    IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
-                                    cache = 
client.addIndexMetadataCache(context.getScanRanges(), ptr);
-                                    byte[] uuidValue = cache.getId();
-                                    
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
-                                }
-                                */
-                            } else {
-                                // In the case of the empty key value column 
family changing, do not send the index
-                                // metadata, as we're currently managing this 
from the client. It's possible for the
-                                // data empty column family to stay the same, 
while the index empty column family
-                                // changes.
-                                PColumn column = deleteList.get(0);
-                                byte[] cq = column.getColumnQualifierBytes();
-                                if (emptyCF == null) {
-                                    
scan.addColumn(column.getFamilyName().getBytes(), cq);
-                                }
-                                
scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, 
column.getFamilyName().getBytes());
-                                
scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
-                            }
-                        }
-                        List<byte[]> columnFamilies = 
Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
-                        if (projectCFs == null) {
-                            for (PColumnFamily family : 
tableRef.getTable().getColumnFamilies()) {
-                                
columnFamilies.add(family.getName().getBytes());
+
+                Scan scan = ScanUtil.newScan(context.getScan());
+                SelectStatement select = SelectStatement.COUNT_ONE;
+                // We need to use this tableRef
+                ColumnResolver resolver = new 
SingleTableRefColumnResolver(tableRef);
+                PhoenixStatement statement = new PhoenixStatement(connection);
+                StatementContext context = new StatementContext(statement, 
resolver, scan, new SequenceManager(statement));
+                long ts = timestamp;
+                // FIXME: DDL operations aren't transactional, so we're basing 
the timestamp on a server timestamp.
+                // Not sure what the fix should be. We don't need conflict 
detection nor filtering of invalid transactions
+                // in this case, so maybe this is ok.
+                if (ts!= HConstants.LATEST_TIMESTAMP && 
tableRef.getTable().isTransactional()) {
+                    ts = TransactionUtil.convertToNanoseconds(ts);
+                }
+                ScanUtil.setTimeRange(scan, scan.getTimeRange().getMin(), ts);
+                if (emptyCF != null) {
+                    scan.setAttribute(BaseScannerRegionObserver.EMPTY_CF, 
emptyCF);
+                    
scan.setAttribute(BaseScannerRegionObserver.EMPTY_COLUMN_QUALIFIER, 
EncodedColumnsUtil.getEmptyKeyValueInfo(tableRef.getTable()).getFirst());
+                }
+                ServerCache cache = null;
+                try {
+                    if (deleteList != null) {
+                        if (deleteList.isEmpty()) {
+                            
scan.setAttribute(BaseScannerRegionObserver.DELETE_AGG, QueryConstants.TRUE);
+                            // In the case of a row deletion, add index 
metadata so mutable secondary indexing works
+                            /* TODO: we currently manually run a scan to 
delete the index data here
+                            ImmutableBytesWritable ptr = context.getTempPtr();
+                            tableRef.getTable().getIndexMaintainers(ptr);
+                            if (ptr.getLength() > 0) {
+                                IndexMetaDataCacheClient client = new 
IndexMetaDataCacheClient(connection, tableRef);
+                                cache = 
client.addIndexMetadataCache(context.getScanRanges(), ptr);
+                                byte[] uuidValue = cache.getId();
+                                
scan.setAttribute(PhoenixIndexCodec.INDEX_UUID, uuidValue);
                             }
+                            */
                         } else {
-                            for (byte[] projectCF : projectCFs) {
-                                columnFamilies.add(projectCF);
+                            // In the case of the empty key value column 
family changing, do not send the index
+                            // metadata, as we're currently managing this from 
the client. It's possible for the
+                            // data empty column family to stay the same, 
while the index empty column family
+                            // changes.
+                            PColumn column = deleteList.get(0);
+                            byte[] cq = column.getColumnQualifierBytes();
+                            if (emptyCF == null) {
+                                
scan.addColumn(column.getFamilyName().getBytes(), cq);
                             }
+                            
scan.setAttribute(BaseScannerRegionObserver.DELETE_CF, 
column.getFamilyName().getBytes());
+                            
scan.setAttribute(BaseScannerRegionObserver.DELETE_CQ, cq);
                         }
-                        // Need to project all column families into the scan, 
since we haven't yet created our empty key value
-                        RowProjector projector = 
ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, 
GroupBy.EMPTY_GROUP_BY);
-                        context.getAggregationManager().compile(context, 
GroupBy.EMPTY_GROUP_BY);
-                        // Explicitly project these column families and don't 
project the empty key value,
-                        // since at this point we haven't added the empty key 
value everywhere.
-                        if (columnFamilies != null) {
-                            scan.getFamilyMap().clear();
-                            for (byte[] family : columnFamilies) {
-                                scan.addFamily(family);
-                            }
-                            projector = new RowProjector(projector,false);
+                    }
+                    List<byte[]> columnFamilies = 
Lists.newArrayListWithExpectedSize(tableRef.getTable().getColumnFamilies().size());
+                    if (projectCFs == null) {
+                        for (PColumnFamily family : 
tableRef.getTable().getColumnFamilies()) {
+                            columnFamilies.add(family.getName().getBytes());
                         }
-                        // Ignore exceptions due to not being able to resolve 
any view columns,
-                        // as this just means the view is invalid. Continue on 
and try to perform
-                        // any other Post DDL operations.
-                        try {
-                            // Since dropping a VIEW does not affect the 
underlying data, we do
-                            // not need to pass through the view statement 
here.
-                            WhereCompiler.compile(context, select); // Push 
where clause into scan
-                        } catch (ColumnFamilyNotFoundException e) {
-                            continue;
-                        } catch (ColumnNotFoundException e) {
-                            continue;
-                        } catch (AmbiguousColumnException e) {
-                            continue;
+                    } else {
+                        for (byte[] projectCF : projectCFs) {
+                            columnFamilies.add(projectCF);
                         }
-                        QueryPlan plan = new AggregatePlan(context, select, 
tableRef, projector, null, null,
-                                OrderBy.EMPTY_ORDER_BY, null, 
GroupBy.EMPTY_GROUP_BY, null, null);
+                    }
+                    // Need to project all column families into the scan, 
since we haven't yet created our empty key value
+                    RowProjector projector = 
ProjectionCompiler.compile(context, SelectStatement.COUNT_ONE, 
GroupBy.EMPTY_GROUP_BY);
+                    context.getAggregationManager().compile(context, 
GroupBy.EMPTY_GROUP_BY);
+                    // Explicitly project these column families and don't 
project the empty key value,
+                    // since at this point we haven't added the empty key 
value everywhere.
+                    if (columnFamilies != null) {
+                        scan.getFamilyMap().clear();
+                        for (byte[] family : columnFamilies) {
+                            scan.addFamily(family);
+                        }
+                        projector = new RowProjector(projector,false);
+                    }
+                    // Ignore exceptions due to not being able to resolve any 
view columns,
+                    // as this just means the view is invalid. Continue on and 
try to perform
+                    // any other Post DDL operations.
+                    try {
+                        // Since dropping a VIEW does not affect the 
underlying data, we do
+                        // not need to pass through the view statement here.
+                        WhereCompiler.compile(context, select); // Push where 
clause into scan
+                    } catch (ColumnFamilyNotFoundException e) {
+                        return null;
+                    } catch (ColumnNotFoundException e) {
+                        return null;
+                    } catch (AmbiguousColumnException e) {
+                        return null;
+                    }
+                    plan = new AggregatePlan(context, select, tableRef, 
projector, null, null,
+                            OrderBy.EMPTY_ORDER_BY, null, 
GroupBy.EMPTY_GROUP_BY, null, null);
+                } finally {
+                    if (cache != null) { // Remove server cache if there is one
+                        cache.close();
+                    }
+                }
+            } finally {
+                if (!wasAutoCommit) connection.setAutoCommit(wasAutoCommit);
+            }
+            return plan;
+        }
+
+        @Override
+        public MutationState execute() throws SQLException {
+            if (tableRefs.isEmpty()) {
+                return new MutationState(0, 1000, connection);
+            }
+            boolean wasAutoCommit = connection.getAutoCommit();
+            try {
+                connection.setAutoCommit(true);
+                SQLException sqlE = null;
+                /*
+                 * Handles:
+                 * 1) deletion of all rows for a DROP TABLE and subsequently 
deletion of all rows for a DROP INDEX;
+                 * 2) deletion of all column values for a ALTER TABLE DROP 
COLUMN
+                 * 3) updating the necessary rows to have an empty KV
+                 * 4) updating table stats
+                 */
+                long totalMutationCount = 0;
+                for (final TableRef tableRef : tableRefs) {
+                    QueryPlan plan = getQueryPlan(tableRef);
+                    if (plan == null)
+                        continue;
+                    try {
+                        ResultIterator iterator = plan.iterator();
                         try {
-                            ResultIterator iterator = plan.iterator();
+                            Tuple row = iterator.next();
+                            ImmutableBytesWritable ptr = context.getTempPtr();
+                            totalMutationCount += 
(Long)plan.getProjector().getColumnProjector(0).getValue(row, PLong.INSTANCE, 
ptr);
+                        } catch (SQLException e) {
+                            sqlE = e;
+                        } finally {
                             try {
-                                Tuple row = iterator.next();
-                                ImmutableBytesWritable ptr = 
context.getTempPtr();
-                                totalMutationCount += 
(Long)projector.getColumnProjector(0).getValue(row, PLong.INSTANCE, ptr);
+                                iterator.close();
                             } catch (SQLException e) {
-                                sqlE = e;
+                                if (sqlE == null) {
+                                    sqlE = e;
+                                } else {
+                                    sqlE.setNextException(e);
+                                }
                             } finally {
-                                try {
-                                    iterator.close();
-                                } catch (SQLException e) {
-                                    if (sqlE == null) {
-                                        sqlE = e;
-                                    } else {
-                                        sqlE.setNextException(e);
-                                    }
-                                } finally {
-                                    if (sqlE != null) {
-                                        throw sqlE;
-                                    }
+                                if (sqlE != null) {
+                                    throw sqlE;
                                 }
                             }
-                        } catch (TableNotFoundException e) {
-                            // Ignore and continue, as HBase throws when table 
hasn't been written to
-                            // FIXME: Remove if this is fixed in 0.96
-                        }
-                    } finally {
-                        if (cache != null) { // Remove server cache if there 
is one
-                            cache.close();
                         }
+                    } catch (TableNotFoundException e) {
+                        // Ignore and continue, as HBase throws when table 
hasn't been written to
+                        // FIXME: Remove if this is fixed in 0.96
                     }
-
                 }
                 final long count = totalMutationCount;
                 return new MutationState(1, 1000, connection) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
index 7d1c1b4..40cea2c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/compile/ServerBuildIndexCompiler.java
@@ -19,15 +19,16 @@ package org.apache.phoenix.compile;
 
 import java.sql.SQLException;
 import java.util.Collections;
-import java.util.List;
 
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.coprocessor.BaseScannerRegionObserver;
 import org.apache.phoenix.coprocessor.MetaDataProtocol;
 import org.apache.phoenix.execute.MutationState;
 import org.apache.phoenix.hbase.index.covered.update.ColumnReference;
+import org.apache.phoenix.hbase.index.util.IndexManagementUtil;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.jdbc.PhoenixConnection;
@@ -36,24 +37,22 @@ import org.apache.phoenix.schema.*;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.schema.types.PLong;
 import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.IndexUtil;
+import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.ScanUtil;
-import org.apache.phoenix.util.StringUtil;
-
-import com.google.common.collect.Lists;
 
 import static org.apache.phoenix.schema.types.PDataType.TRUE_BYTES;
 
 
 /**
- * Class that compiles plan to generate initial data values after a DDL 
command for
+ * Class that compiles queryPlan to generate initial data values after a DDL 
command for
  * index table.
  */
 public class ServerBuildIndexCompiler {
     private final PhoenixConnection connection;
-    private final String tableName;
+    private final String indexTableFullName;
+    private final String dataTableFullName;
     private PTable dataTable;
-    private QueryPlan plan;
+    private QueryPlan queryPlan;
 
     private class RowCountMutationPlan extends BaseMutationPlan {
         private RowCountMutationPlan(StatementContext context, 
PhoenixStatement.Operation operation) {
@@ -62,7 +61,7 @@ public class ServerBuildIndexCompiler {
         @Override
         public MutationState execute() throws SQLException {
             connection.getMutationState().commitDDLFence(dataTable);
-            Tuple tuple = plan.iterator().next();
+            Tuple tuple = queryPlan.iterator().next();
             long rowCount = 0;
             if (tuple != null) {
                 Cell kv = tuple.getValue(0);
@@ -78,61 +77,57 @@ public class ServerBuildIndexCompiler {
 
         @Override
         public QueryPlan getQueryPlan() {
-            return plan;
+            return queryPlan;
         }
     };
     
-    public ServerBuildIndexCompiler(PhoenixConnection connection, String 
tableName) {
+    public ServerBuildIndexCompiler(PhoenixConnection connection, String 
dataTableFullName, String indexTableFullName) {
         this.connection = connection;
-        this.tableName = tableName;
+        this.dataTableFullName = dataTableFullName;
+        this.indexTableFullName = indexTableFullName;
     }
 
-    public MutationPlan compile(PTable index) throws SQLException {
-        try (final PhoenixStatement statement = new 
PhoenixStatement(connection)) {
-            String query = "SELECT count(*) FROM " + tableName;
-            this.plan = statement.compileQuery(query);
-            TableRef tableRef = plan.getTableRef();
-            Scan scan = plan.getContext().getScan();
-            ImmutableBytesWritable ptr = new ImmutableBytesWritable();
-            dataTable = tableRef.getTable();
-            if (index.getIndexType() == PTable.IndexType.GLOBAL &&  
dataTable.isTransactional()) {
-                throw new IllegalArgumentException(
-                        "ServerBuildIndexCompiler does not support global 
indexes on transactional tables");
-            }
-            IndexMaintainer.serialize(dataTable, ptr, 
Collections.singletonList(index), plan.getContext().getConnection());
-            // Set the scan attributes that UngroupedAggregateRegionObserver 
will switch on.
-            // For local indexes, the 
BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
-            // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute 
is set to the serialized form of index
-            // metadata to build index rows from data table rows. For global 
indexes, we also need to set (1) the
-            // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to 
signal UngroupedAggregateRegionObserver
-            // that this scan is for building global indexes and (2) the 
MetaDataProtocol.PHOENIX_VERSION attribute
-            // that will be passed as a mutation attribute for the scanned 
mutations that will be applied on
-            // the index table possibly remotely
-            if (index.getIndexType() == PTable.IndexType.LOCAL) {
-                
scan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
-            } else {
-                scan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
ByteUtil.copyKeyBytesIfNecessary(ptr));
-                scan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
-                ScanUtil.setClientVersion(scan, 
MetaDataProtocol.PHOENIX_VERSION);
-            }
-            // By default, we'd use a FirstKeyOnly filter as nothing else 
needs to be projected for count(*).
-            // However, in this case, we need to project all of the data 
columns that contribute to the index.
-            IndexMaintainer indexMaintainer = 
index.getIndexMaintainer(dataTable, connection);
-            for (ColumnReference columnRef : indexMaintainer.getAllColumns()) {
-                if (index.getImmutableStorageScheme() == 
PTable.ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS) {
-                    scan.addFamily(columnRef.getFamily());
-                } else {
-                    scan.addColumn(columnRef.getFamily(), 
columnRef.getQualifier());
-                }
-            }
-
-            if (dataTable.isTransactional()) {
-                scan.setAttribute(BaseScannerRegionObserver.TX_STATE, 
connection.getMutationState().encodeTransaction());
-            }
+    public MutationPlan compile() throws SQLException {
+        PTable index = PhoenixRuntime.getTable(connection, indexTableFullName);
+        dataTable = PhoenixRuntime.getTable(connection, dataTableFullName);
+        if (index.getIndexType() == PTable.IndexType.GLOBAL &&  
dataTable.isTransactional()) {
+            throw new IllegalArgumentException(
+                    "ServerBuildIndexCompiler does not support global indexes 
on transactional tables");
+        }
+        PostDDLCompiler compiler = new PostDDLCompiler(connection);
+        TableRef dataTableRef = new TableRef(dataTable);
+        compiler.compile(Collections.singletonList(dataTableRef),
+                null, null, null, HConstants.LATEST_TIMESTAMP);
+        queryPlan = compiler.getQueryPlan(dataTableRef);
+        Scan dataTableScan = 
IndexManagementUtil.newLocalStateScan(queryPlan.getContext().getScan(),
+                Collections.singletonList(index.getIndexMaintainer(dataTable, 
connection)));
+        ImmutableBytesWritable indexMetaDataPtr = new 
ImmutableBytesWritable(ByteUtil.EMPTY_BYTE_ARRAY);
+        IndexMaintainer.serializeAdditional(dataTable, indexMetaDataPtr, 
Collections.singletonList(index),
+                connection);
+        byte[] attribValue = 
ByteUtil.copyKeyBytesIfNecessary(indexMetaDataPtr);
 
-            // Go through MutationPlan abstraction so that we can create local 
indexes
-            // with a connectionless connection (which makes testing easier).
-            return new RowCountMutationPlan(plan.getContext(), 
PhoenixStatement.Operation.UPSERT);
+        // Set the scan attributes that UngroupedAggregateRegionObserver will 
switch on.
+        // For local indexes, the 
BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO attribute, and
+        // for global indexes PhoenixIndexCodec.INDEX_PROTO_MD attribute is 
set to the serialized form of index
+        // metadata to build index rows from data table rows. For global 
indexes, we also need to set (1) the
+        // BaseScannerRegionObserver.REBUILD_INDEXES attribute in order to 
signal UngroupedAggregateRegionObserver
+        // that this scan is for building global indexes and (2) the 
MetaDataProtocol.PHOENIX_VERSION attribute
+        // that will be passed as a mutation attribute for the scanned 
mutations that will be applied on
+        // the index table possibly remotely
+        ScanUtil.setClientVersion(dataTableScan, 
MetaDataProtocol.PHOENIX_VERSION);
+        if (index.getIndexType() == PTable.IndexType.LOCAL) {
+            
dataTableScan.setAttribute(BaseScannerRegionObserver.LOCAL_INDEX_BUILD_PROTO, 
attribValue);
+        } else {
+            dataTableScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
attribValue);
+            
dataTableScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
+            ScanUtil.setClientVersion(dataTableScan, 
MetaDataProtocol.PHOENIX_VERSION);
         }
+        if (dataTable.isTransactional()) {
+            dataTableScan.setAttribute(BaseScannerRegionObserver.TX_STATE, 
connection.getMutationState().encodeTransaction());
+        }
+
+        // Go through MutationPlan abstraction so that we can create local 
indexes
+        // with a connectionless connection (which makes testing easier).
+        return new RowCountMutationPlan(queryPlan.getContext(), 
PhoenixStatement.Operation.UPSERT);
     }
 }
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java 
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
index dd95c8e..f5aaf42 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/index/GlobalIndexChecker.java
@@ -299,6 +299,11 @@ public class GlobalIndexChecker implements 
RegionCoprocessor, RegionObserver {
                 buildIndexScan.setAttribute(PhoenixIndexCodec.INDEX_PROTO_MD, 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD));
                 
buildIndexScan.setAttribute(BaseScannerRegionObserver.REBUILD_INDEXES, 
TRUE_BYTES);
                 
buildIndexScan.setAttribute(BaseScannerRegionObserver.SKIP_REGION_BOUNDARY_CHECK,
 Bytes.toBytes(true));
+                // We want delete markers to be replayed during index rebuild.
+                buildIndexScan.setRaw(true);
+                buildIndexScan.setCacheBlocks(false);
+                buildIndexScan.readAllVersions();
+                buildIndexScan.setTimeRange(0, maxTimestamp);
             }
             // Rebuild the index row from the corresponding the row in the the 
data table
             // Get the data row key from the index row key
@@ -306,9 +311,6 @@ public class GlobalIndexChecker implements 
RegionCoprocessor, RegionObserver {
             buildIndexScan.withStartRow(dataRowKey, true);
             buildIndexScan.withStopRow(dataRowKey, true);
             buildIndexScan.setTimeRange(0, maxTimestamp);
-            // If the data table row has been deleted then we want to delete 
the corresponding index row too.
-            // Thus, we are using a raw scan
-            buildIndexScan.setRaw(true);
             try (ResultScanner resultScanner = 
dataHTable.getScanner(buildIndexScan)){
                 resultScanner.next();
             } catch (Throwable t) {
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
index 1abcef4..5128c26 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixServerBuildIndexInputFormat.java
@@ -82,18 +82,16 @@ public class PhoenixServerBuildIndexInputFormat<T extends 
DBWritable> extends Ph
         try (final Connection connection = 
ConnectionUtil.getInputConnection(configuration, overridingProps)) {
             PhoenixConnection phoenixConnection = 
connection.unwrap(PhoenixConnection.class);
             Long scn = (currentScnValue != null) ? 
Long.valueOf(currentScnValue) : EnvironmentEdgeManager.currentTimeMillis();
-            PTable indexTable = 
PhoenixRuntime.getTableNoCache(phoenixConnection, indexTableFullName);
             ServerBuildIndexCompiler compiler =
-                    new ServerBuildIndexCompiler(phoenixConnection, 
dataTableFullName);
-            MutationPlan plan = compiler.compile(indexTable);
-            Scan scan = plan.getContext().getScan();
-
+                    new ServerBuildIndexCompiler(phoenixConnection, 
dataTableFullName, indexTableFullName);
+            MutationPlan plan = compiler.compile();
+            queryPlan = plan.getQueryPlan();
+            Scan scan = queryPlan.getContext().getScan();
             try {
                 scan.setTimeRange(0, scn);
             } catch (IOException e) {
                 throw new SQLException(e);
             }
-            queryPlan = plan.getQueryPlan();
             // since we can't set a scn on connections with txn set TX_SCN 
attribute so that the max time range is set by BaseScannerRegionObserver
             if (txnScnValue != null) {
                 scan.setAttribute(BaseScannerRegionObserver.TX_SCN, 
Bytes.toBytes(Long.valueOf(txnScnValue)));
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java 
b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
index b379e14..01d2a83 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/MetaDataClient.java
@@ -1302,8 +1302,10 @@ public class MetaDataClient {
             PostIndexDDLCompiler compiler = new 
PostIndexDDLCompiler(connection, dataTableRef);
             return compiler.compile(index);
         } else {
-            ServerBuildIndexCompiler compiler = new 
ServerBuildIndexCompiler(connection, getFullTableName(dataTableRef));
-            return compiler.compile(index);
+            ServerBuildIndexCompiler compiler = new 
ServerBuildIndexCompiler(connection,
+                    
SchemaUtil.getTableName(dataTableRef.getTable().getSchemaName().getString(), 
dataTableRef.getTable().getTableName().getString()),
+                    SchemaUtil.getTableName(index.getSchemaName().getString(), 
index.getTableName().getString()));
+            return compiler.compile();
         }
     }
 
@@ -1673,9 +1675,6 @@ public class MetaDataClient {
             return buildIndexAtTimeStamp(table, statement.getTable());
         }
 
-        String dataTableFullName = SchemaUtil.getTableName(
-                tableRef.getTable().getSchemaName().getString(),
-                tableRef.getTable().getTableName().getString());
         return buildIndex(table, tableRef);
     }
 

Reply via email to