Repository: phoenix
Updated Branches:
  refs/heads/4.x-HBase-0.98 4fc3f7545 -> 178405d70


PHOENIX-4530 Do not collect delete markers during major compaction of table 
with disabled mutable indexes


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/178405d7
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/178405d7
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/178405d7

Branch: refs/heads/4.x-HBase-0.98
Commit: 178405d7012b05a683da57f5f2e53480e1dd6aed
Parents: 4fc3f75
Author: Vincent Poon <vincentp...@apache.org>
Authored: Thu Feb 22 17:14:16 2018 -0800
Committer: Vincent Poon <vincentp...@apache.org>
Committed: Thu Feb 22 17:16:20 2018 -0800

----------------------------------------------------------------------
 .../PartialScannerResultsDisabledIT.java        | 193 ++++++++++++++++++
 .../UngroupedAggregateRegionObserverIT.java     | 199 -------------------
 .../phoenix/end2end/index/MutableIndexIT.java   |  57 ++++++
 .../end2end/index/PartialIndexRebuilderIT.java  |  39 ----
 .../UngroupedAggregateRegionObserver.java       | 130 +++++-------
 .../java/org/apache/phoenix/util/TestUtil.java  |  19 ++
 6 files changed, 314 insertions(+), 323 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
new file mode 100644
index 0000000..59471dd
--- /dev/null
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/PartialScannerResultsDisabledIT.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
required by applicable
+ * law or agreed to in writing, software distributed under the License is 
distributed on an "AS IS"
+ * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied. See the License
+ * for the specific language governing permissions and limitations under the 
License.
+ */
+package org.apache.phoenix.end2end;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.sql.Timestamp;
+import java.util.Properties;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.phoenix.util.IndexScrutiny;
+import org.apache.phoenix.util.SchemaUtil;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PartialScannerResultsDisabledIT extends ParallelStatsDisabledIT {
+    public static final String TEST_TABLE_DDL =
+            "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + "    ORGANIZATION_ID 
CHAR(15) NOT NULL,\n"
+                    + "    FEED_ELEMENT_ID CHAR(15) NOT NULL,\n"
+                    + "    CONTAINER_ID CHAR(15) NOT NULL,\n"
+                    + "    FEED_TYPE VARCHAR(1) NOT NULL, \n"
+                    + "    NETWORK_ID CHAR(15) NOT NULL,\n" + "    USER_ID 
CHAR(15) NOT NULL,\n"
+                    + "    CREATED_TIME TIMESTAMP,\n" + "    LAST_UPDATE 
TIMESTAMP,\n"
+                    + "    RELEVANCE_SCORE DOUBLE,\n" + "    FEED_ITEM_TYPE 
VARCHAR(1),\n"
+                    + "    FEED_ELEMENT_TYPE VARCHAR(1),\n"
+                    + "    FEED_ELEMENT_IS_SYS_GEN BOOLEAN,\n"
+                    + "    FEED_ELEMENT_STATUS VARCHAR(1),\n"
+                    + "    FEED_ELEMENT_VISIBILITY VARCHAR(1),\n" + "    
PARENT_ID CHAR(15),\n"
+                    + "    CREATED_BY CHAR(15),\n" + "    BEST_COMMENT_ID 
CHAR(15),\n"
+                    + "    COMMENT_COUNT INTEGER,\n" + "    CONSTRAINT PK 
PRIMARY KEY\n" + "    (\n"
+                    + "        ORGANIZATION_ID,\n" + "        
FEED_ELEMENT_ID,\n"
+                    + "        CONTAINER_ID,\n" + "        FEED_TYPE,\n" + "   
     NETWORK_ID,\n"
+                    + "        USER_ID\n" + "    )\n" + ") 
COLUMN_ENCODED_BYTES = 0";
+
+    public static final String INDEX_1_DDL =
+            "CREATE INDEX IF NOT EXISTS %s\n" + "ON %s (\n" + "    
NETWORK_ID,\n"
+                    + "    CONTAINER_ID,\n" + "    FEED_TYPE,\n" + "    
USER_ID,\n"
+                    + "    CREATED_TIME DESC,\n" + "    FEED_ELEMENT_ID 
DESC,\n"
+                    + "    CREATED_BY\n" + ") "
+                    + "    INCLUDE (\n" + "    FEED_ITEM_TYPE,\n"
+                    + "    FEED_ELEMENT_TYPE,\n" + "    
FEED_ELEMENT_IS_SYS_GEN,\n"
+                    + "    FEED_ELEMENT_STATUS,\n" + "    
FEED_ELEMENT_VISIBILITY,\n"
+                    + "    PARENT_ID,\n" + "    BEST_COMMENT_ID,\n" + "    
COMMENT_COUNT\n" + ")";
+
+    private static final String UPSERT_INTO_DATA_TABLE =
+            "UPSERT INTO %s\n" + "(\n" + "    ORGANIZATION_ID,\n" + "    
FEED_ELEMENT_ID,\n"
+                    + "    CONTAINER_ID,\n" + "    FEED_TYPE,\n" + "    
NETWORK_ID,\n"
+                    + "    USER_ID,\n" + "    CREATED_TIME,\n" + "    
LAST_UPDATE,\n"
+                    + "    FEED_ITEM_TYPE,\n" + "    FEED_ELEMENT_TYPE,\n"
+                    + "    FEED_ELEMENT_IS_SYS_GEN,\n" + "    
FEED_ELEMENT_STATUS,\n"
+                    + "    FEED_ELEMENT_VISIBILITY,\n" + "    PARENT_ID,\n" + 
"    CREATED_BY,\n"
+                    + "    BEST_COMMENT_ID,\n" + "    COMMENT_COUNT\n" + ")"
+                    + "VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)";
+
+    private String dataTableName;
+    private String indexTableName;
+    private String schemaName;
+    private String dataTableFullName;
+    private static String indexTableFullName;
+    private static final Logger logger = 
LoggerFactory.getLogger(PartialScannerResultsDisabledIT.class);
+    private static Random random = new Random(1);
+    // background writer threads
+    private static Random sourceOfRandomness = new Random(0);
+    private static AtomicInteger upsertIdCounter = new AtomicInteger(1);
+
+    @Before
+    public void setup() throws Exception {
+        // create the tables
+        generateUniqueTableNames();
+        createTestTable(getUrl(), String.format(TEST_TABLE_DDL, 
dataTableFullName));
+        createTestTable(getUrl(), String.format(INDEX_1_DDL, indexTableName, 
dataTableFullName));
+    }
+    
+    @Test
+    public void testWithEnoughData() throws Exception {
+        try (Connection conn = DriverManager.getConnection(getUrl())) {
+            // Write enough data to trigger partial scanner results
+            // TODO: it's likely that less data could be written if whatever
+            // config parameters decide this are lowered.
+            writeSingleBatch(conn, 100, 20, dataTableFullName);
+            logger.info("Running scrutiny");
+            // Scutunize index to see if partial results are silently returned
+            // In that case we'll get a false positive on the scrutiny run.
+            long rowCount = IndexScrutiny.scrutinizeIndex(conn, 
dataTableFullName, indexTableFullName);
+            assertEquals(2000,rowCount);
+        }
+    }
+
+    /**
+     * Simple select query with fetch size that exceed the result size. In 
that case scan would start to produce
+     * partial result sets that from Phoenix perspective are the rows with 
NULL values.
+     * @throws SQLException
+     */
+    @Test
+    public void partialResultDuringSelect () throws SQLException {
+        String tableName = generateUniqueName();
+        Properties props = new Properties();
+        props.setProperty(HConstants.HBASE_CLIENT_SCANNER_MAX_RESULT_SIZE_KEY, 
"5");
+        int numRecords = 10;
+        try (Connection conn = DriverManager.getConnection(url, props)) {
+            conn.createStatement().execute(
+                    "CREATE TABLE " + tableName + " (PK1 INTEGER NOT NULL 
PRIMARY KEY, KV1 VARCHAR)");
+            int i = 0;
+            String upsert = "UPSERT INTO " + tableName + " VALUES (?, ?)";
+            PreparedStatement stmt = conn.prepareStatement(upsert);
+            while (i < numRecords) {
+                stmt.setInt(1, i);
+                stmt.setString(2, UUID.randomUUID().toString());
+                stmt.executeUpdate();
+                i++;
+            }
+            conn.commit();
+
+            String sql = "SELECT * FROM " + tableName;
+            // at every next call wait for this period. This will cause lease 
to expire.
+            Statement s = conn.createStatement();
+            s.setFetchSize(100);
+            ResultSet rs = s.executeQuery(sql);
+            int count = 0;
+            while (rs.next()) {
+                if (rs.getString(2) == null)
+                    fail("Null value because of partial row scan");
+            }
+            count++;
+        }
+
+    }
+
+    private static String randString(int length, Random random) {
+        return RandomStringUtils.randomAlphabetic(length);
+    }
+    
+    public static void writeSingleBatch(Connection connection, int batchSize, 
int numBatches, String tableName) throws Exception {
+        for (int j = 0; j < numBatches; j++) {
+            try (PreparedStatement statement =
+                    
connection.prepareStatement(String.format(UPSERT_INTO_DATA_TABLE, tableName))) {
+                for (int i = 0; i < batchSize; i++) {
+                    int index = 0;
+                    String id = "" + upsertIdCounter.getAndIncrement();
+                    statement.setString(++index, id); // ORGANIZATION_ID
+                    statement.setString(++index, id); // FEED_ELEMENT_ID,\n"
+                    statement.setString(++index, id); // CONTAINER_ID,\n"
+                    statement.setString(++index, randString(1, 
sourceOfRandomness)); // FEED_TYPE,\n"
+                    statement.setString(++index, randString(15, 
sourceOfRandomness)); // NETWORK_ID,\n"
+                    statement.setString(++index, randString(15, 
sourceOfRandomness)); // USER_ID,\n"
+                    statement.setTimestamp(++index, new 
Timestamp(System.currentTimeMillis())); // CREATED_TIME,\n"
+                    statement.setTimestamp(++index, new 
Timestamp(System.currentTimeMillis())); // LAST_UPDATE\n"
+                    statement.setString(++index, randString(1, 
sourceOfRandomness)); // FEED_ITEM_TYPE\n"
+                    statement.setString(++index, randString(1, 
sourceOfRandomness)); // FEED_ELEMENT_TYPE\n"
+                    statement.setBoolean(++index, false); // 
FEED_ELEMENT_IS_SYS_GEN\n"
+                    statement.setString(++index, randString(1, 
sourceOfRandomness)); // FEED_ELEMENT_STATUS\n"
+                    statement.setString(++index, randString(1, 
sourceOfRandomness)); // FEED_ELEMENT_VISIBILITY\n"
+                    statement.setString(++index, randString(15, 
sourceOfRandomness)); // PARENT_ID\n"
+                    statement.setString(++index, randString(15, 
sourceOfRandomness)); // CREATED_BY\n"
+                    statement.setString(++index, randString(15, 
sourceOfRandomness)); // BEST_COMMENT_ID\n"
+                    statement.setInt(++index, random.nextInt()); // 
COMMENT_COUNT\n" + ")"
+                    statement.execute();
+                }
+                connection.commit();
+            }
+        }
+    }
+    
+    private void generateUniqueTableNames() {
+        schemaName = generateUniqueName();
+        dataTableName = generateUniqueName() + "_DATA";
+        dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
+        indexTableName = generateUniqueName() + "_IDX";
+        indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
deleted file mode 100644
index d64df2c..0000000
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/UngroupedAggregateRegionObserverIT.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.phoenix.end2end;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.never;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.log4j.Appender;
-import org.apache.log4j.Level;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.spi.LoggingEvent;
-import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
-import org.apache.phoenix.jdbc.PhoenixConnection;
-import org.apache.phoenix.schema.PIndexState;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
-import org.apache.phoenix.util.IndexUtil;
-import org.apache.phoenix.util.PropertiesUtil;
-import org.apache.phoenix.util.SchemaUtil;
-import org.apache.phoenix.util.TestUtil;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.runners.MockitoJUnitRunner;
-
-@RunWith(MockitoJUnitRunner.class)
-public class UngroupedAggregateRegionObserverIT extends 
ParallelStatsDisabledIT {
-    
-    public static final String TEST_TABLE_DDL =
-            "CREATE TABLE IF NOT EXISTS %s\n" + "(\n" + "    ORGANIZATION_ID 
CHAR(15) NOT NULL,\n"
-                    + "    FEED_ELEMENT_ID CHAR(15) NOT NULL,\n"
-                    + "    CONTAINER_ID CHAR(15) NOT NULL,\n"
-                    + "    FEED_TYPE VARCHAR(1) NOT NULL, \n"
-                    + "    NETWORK_ID CHAR(15) NOT NULL,\n" + "    USER_ID 
CHAR(15) NOT NULL,\n"
-                    + "    CREATED_TIME TIMESTAMP,\n" + "    LAST_UPDATE 
TIMESTAMP,\n"
-                    + "    RELEVANCE_SCORE DOUBLE,\n" + "    FEED_ITEM_TYPE 
VARCHAR(1),\n"
-                    + "    FEED_ELEMENT_TYPE VARCHAR(1),\n"
-                    + "    FEED_ELEMENT_IS_SYS_GEN BOOLEAN,\n"
-                    + "    FEED_ELEMENT_STATUS VARCHAR(1),\n"
-                    + "    FEED_ELEMENT_VISIBILITY VARCHAR(1),\n" + "    
PARENT_ID CHAR(15),\n"
-                    + "    CREATED_BY CHAR(15),\n" + "    BEST_COMMENT_ID 
CHAR(15),\n"
-                    + "    COMMENT_COUNT INTEGER,\n" + "    CONSTRAINT PK 
PRIMARY KEY\n" + "    (\n"
-                    + "        ORGANIZATION_ID,\n" + "        
FEED_ELEMENT_ID,\n"
-                    + "        CONTAINER_ID,\n" + "        FEED_TYPE,\n" + "   
     NETWORK_ID,\n"
-                    + "        USER_ID\n" + "    )\n" + ") 
COLUMN_ENCODED_BYTES = 0";
-
-    public static final String INDEX_1_DDL =
-            "CREATE INDEX IF NOT EXISTS %s\n" + "ON %s (\n" + "    
NETWORK_ID,\n"
-                    + "    CONTAINER_ID,\n" + "    FEED_TYPE,\n" + "    
USER_ID,\n"
-                    + "    CREATED_TIME DESC,\n" + "    FEED_ELEMENT_ID 
DESC,\n"
-                    + "    CREATED_BY\n" + ") "
-                    + "    INCLUDE (\n" + "    FEED_ITEM_TYPE,\n"
-                    + "    FEED_ELEMENT_TYPE,\n" + "    
FEED_ELEMENT_IS_SYS_GEN,\n"
-                    + "    FEED_ELEMENT_STATUS,\n" + "    
FEED_ELEMENT_VISIBILITY,\n"
-                    + "    PARENT_ID,\n" + "    BEST_COMMENT_ID,\n" + "    
COMMENT_COUNT\n" + ")";
-
-    private String dataTableName;
-    private String indexTableName;
-    private String schemaName;
-    private String dataTableFullName;
-    private static String indexTableFullName;
-
-    @Mock
-    private Appender mockAppender;
-
-    @Captor
-    private ArgumentCaptor<LoggingEvent> captorLoggingEvent;
-    private UngroupedAggregateRegionObserver ungroupedObserver;
-
-    @Before
-    public void setup() {
-        ungroupedObserver = new UngroupedAggregateRegionObserver();
-        
ungroupedObserver.setCompactionConfig(PropertiesUtil.cloneConfig(config));
-    }
-
-    /**
-     * Tests the that post compact hook doesn't log any NPE for a System table
-     */
-    @Test
-    public void testPostCompactSystemSequence() throws Exception {
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            startCapturingIndexLog();
-            // run the post-compact hook
-            ungroupedObserver.clearTsOnDisabledIndexes("SYSTEM.SEQUENCE");
-            stopCapturingIndexLog();
-            // uneventful - nothing should be logged
-            Mockito.verify(mockAppender, never())
-                    .doAppend(captorLoggingEvent.capture());
-        }
-    }
-
-    /**
-     * Tests that calling the post compact hook on the data table permanently 
disables an index that
-     * is being rebuilt (i.e. already disabled or inactive)
-     */
-    @Test
-    public void testPostCompactDataTableDuringRebuild() throws Exception {
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            generateUniqueTableNames();
-            testRebuildPostCompact(conn, dataTableFullName);
-        }
-    }
-
-    /**
-     * Tests that calling the post compact hook on the index table permanently 
disables an index
-     * that is being rebuilt (i.e. already disabled or inactive)
-     */
-    @Test
-    public void testPostCompactIndexTableDuringRebuild() throws Exception {
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            generateUniqueTableNames();
-            testRebuildPostCompact(conn, indexTableFullName);
-        }
-    }
-
-    private void testRebuildPostCompact(Connection conn, String tableToCompact)
-            throws SQLException {
-        conn.createStatement().execute(
-            String.format(TEST_TABLE_DDL, dataTableFullName));
-        conn.createStatement().execute(String.format(INDEX_1_DDL,
-            indexTableName, dataTableFullName));
-        // disable the index, simulating an index write failure
-        PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
-        IndexUtil.updateIndexState(pConn, indexTableFullName, 
PIndexState.DISABLE,
-            EnvironmentEdgeManager.currentTimeMillis());
-
-        // run the post-compact hook on the data table
-        startCapturingIndexLog();
-        ungroupedObserver.clearTsOnDisabledIndexes(tableToCompact);
-        stopCapturingIndexLog();
-        // an event should've been logged
-        Mockito.verify(mockAppender).doAppend(captorLoggingEvent.capture());
-        LoggingEvent loggingEvent = captorLoggingEvent.getValue();
-        assertThat(loggingEvent.getLevel(), is(Level.INFO));
-        // index should be permanently disabled (disabletime of 0)
-        assertTrue(TestUtil.checkIndexState(pConn, indexTableFullName, 
PIndexState.DISABLE, 0L));
-    }
-
-    /**
-     * Tests that a non-Phoenix table (created purely through HBase) doesn't 
log a warning in
-     * postCompact
-     */
-    @Test
-    public void testPostCompactTableNotFound() throws Exception {
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            HBaseTestingUtility utility = getUtility();
-            String nonPhoenixTable = "NOT_A_PHOENIX_TABLE";
-            
utility.getHBaseAdmin().createTable(utility.createTableDescriptor(nonPhoenixTable));
-            startCapturingIndexLog();
-            ungroupedObserver.clearTsOnDisabledIndexes(nonPhoenixTable);
-            stopCapturingIndexLog();
-            // a debug level event should've been logged
-            
Mockito.verify(mockAppender).doAppend(captorLoggingEvent.capture());
-            LoggingEvent loggingEvent = captorLoggingEvent.getValue();
-            assertThat(loggingEvent.getLevel(), is(Level.DEBUG));
-        }
-    }
-
-    private void stopCapturingIndexLog() {
-        
LogManager.getLogger(UngroupedAggregateRegionObserver.class).removeAppender(mockAppender);
-    }
-
-    private void startCapturingIndexLog() {
-        
LogManager.getLogger(UngroupedAggregateRegionObserver.class).addAppender(mockAppender);
-    }
-
-    private void generateUniqueTableNames() {
-        schemaName = generateUniqueName();
-        dataTableName = generateUniqueName() + "_DATA";
-        dataTableFullName = SchemaUtil.getTableName(schemaName, dataTableName);
-        indexTableName = generateUniqueName() + "_IDX";
-        indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
-    }
-}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index 677cf92..600d0cd 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -40,14 +40,23 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.query.BaseTest;
 import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PTableKey;
+import org.apache.phoenix.util.ByteUtil;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.IndexUtil;
 import org.apache.phoenix.util.PhoenixRuntime;
 import org.apache.phoenix.util.PropertiesUtil;
 import org.apache.phoenix.util.QueryUtil;
@@ -742,6 +751,54 @@ public class MutableIndexIT extends 
ParallelStatsDisabledIT {
       }
   }
 
+  // Tests that if major compaction is run on a table with a disabled index,
+  // deleted cells are kept
+  @Test
+  public void testCompactDisabledIndex() throws Exception {
+      try (Connection conn = getConnection()) {
+          String schemaName = generateUniqueName();
+          String dataTableName = generateUniqueName() + "_DATA";
+          String dataTableFullName = SchemaUtil.getTableName(schemaName, 
dataTableName);
+          String indexTableName = generateUniqueName() + "_IDX";
+          String indexTableFullName = SchemaUtil.getTableName(schemaName, 
indexTableName);
+          conn.createStatement().execute(
+              String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL, 
dataTableFullName));
+          
conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL,
+              indexTableName, dataTableFullName));
+
+          //insert a row, and delete it
+          PartialScannerResultsDisabledIT.writeSingleBatch(conn, 1, 1, 
dataTableFullName);
+          conn.createStatement().execute("DELETE FROM " + dataTableFullName);
+          conn.commit();
+
+          // disable the index, simulating an index write failure
+          PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
+          IndexUtil.updateIndexState(pConn, indexTableFullName, 
PIndexState.DISABLE,
+              EnvironmentEdgeManager.currentTimeMillis());
+
+          // major compaction should not remove the deleted row
+          List<HRegion> regions = 
getUtility().getHBaseCluster().getRegions(TableName.valueOf(dataTableFullName));
+          HRegion hRegion = regions.get(0);
+          hRegion.flushcache();
+          HStore store = (HStore) 
hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
+          store.triggerMajorCompaction();
+          store.compactRecentForTestingAssumingDefaultPolicy(1);
+          HTableInterface dataHTI = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName));
+          assertEquals(1, TestUtil.getRawRowCount(dataHTI));
+
+          // reenable the index
+          IndexUtil.updateIndexState(pConn, indexTableFullName, 
PIndexState.INACTIVE,
+              EnvironmentEdgeManager.currentTimeMillis());
+          IndexUtil.updateIndexState(pConn, indexTableFullName, 
PIndexState.ACTIVE, 0L);
+
+          // now major compaction should remove the deleted row
+          store.triggerMajorCompaction();
+          store.compactRecentForTestingAssumingDefaultPolicy(1);
+          dataHTI = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName));
+          assertEquals(0, TestUtil.getRawRowCount(dataHTI));
+      }
+  }
+
 private void upsertRow(String dml, Connection tenantConn, int i) throws 
SQLException {
     PreparedStatement stmt = tenantConn.prepareStatement(dml);
       stmt.setString(1, "00000000000000" + String.valueOf(i));

http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
index 3961d32..46443e3 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/PartialIndexRebuilderIT.java
@@ -318,45 +318,6 @@ public class PartialIndexRebuilderIT extends 
BaseUniqueNamesOwnClusterIT {
         conn.commit();
         return hasInactiveIndex;
     }
-    
-    @Test
-    public void testCompactionDuringRebuild() throws Throwable {
-        String schemaName = generateUniqueName();
-        String tableName = generateUniqueName();
-        String indexName1 = generateUniqueName();
-        String indexName2 = generateUniqueName();
-        final String fullTableName = SchemaUtil.getTableName(schemaName, 
tableName);
-        String fullIndexName1 = SchemaUtil.getTableName(schemaName, 
indexName1);
-        String fullIndexName2 = SchemaUtil.getTableName(schemaName, 
indexName2);
-        final MyClock clock = new MyClock(1000);
-        // Use our own clock to prevent race between partial rebuilder and 
compaction
-        EnvironmentEdgeManager.injectEdge(clock);
-        try (Connection conn = DriverManager.getConnection(getUrl())) {
-            conn.createStatement().execute("CREATE TABLE " + fullTableName + 
"(k INTEGER PRIMARY KEY, v1 INTEGER, v2 INTEGER) COLUMN_ENCODED_BYTES = 0, 
STORE_NULLS=true, GUIDE_POSTS_WIDTH=1000");
-            clock.time += 100;
-            conn.createStatement().execute("CREATE INDEX " + indexName1 + " ON 
" + fullTableName + " (v1) INCLUDE (v2)");
-            clock.time += 100;
-            conn.createStatement().execute("CREATE INDEX " + indexName2 + " ON 
" + fullTableName + " (v2) INCLUDE (v1)");
-            clock.time += 100;
-            conn.createStatement().execute("UPSERT INTO " + fullTableName + " 
VALUES(1, 2, 3)");
-            conn.commit();
-            clock.time += 100;
-            long disableTS = EnvironmentEdgeManager.currentTimeMillis();
-            HTableInterface metaTable = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES);
-            IndexUtil.updateIndexState(fullIndexName1, disableTS, metaTable, 
PIndexState.DISABLE);
-            IndexUtil.updateIndexState(fullIndexName2, disableTS, metaTable, 
PIndexState.DISABLE);
-            clock.time += 100;
-            TestUtil.doMajorCompaction(conn, fullIndexName1);
-            clock.time += 100;
-            assertTrue(TestUtil.checkIndexState(conn, fullIndexName1, 
PIndexState.DISABLE, 0L));
-            assertFalse(TestUtil.checkIndexState(conn, fullIndexName2, 
PIndexState.DISABLE, 0L));
-            TestUtil.doMajorCompaction(conn, fullTableName);
-            clock.time += 100;
-            assertTrue(TestUtil.checkIndexState(conn, fullIndexName2, 
PIndexState.DISABLE, 0L));
-        } finally {
-            EnvironmentEdgeManager.injectEdge(null);
-        }
-    }
 
     @Test
     @Repeat(5)

http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index b1b8056..8ef7285 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -68,10 +68,11 @@ import 
org.apache.hadoop.hbase.ipc.controller.InterRegionServerIndexRpcControlle
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.KeyValueScanner;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreScanner;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
@@ -104,14 +105,12 @@ import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
 import org.apache.phoenix.schema.PColumn;
-import org.apache.phoenix.schema.PIndexState;
 import org.apache.phoenix.schema.PRow;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.PTableImpl;
 import org.apache.phoenix.schema.PTableType;
 import org.apache.phoenix.schema.RowKeySchema;
 import org.apache.phoenix.schema.SortOrder;
-import org.apache.phoenix.schema.TableNotFoundException;
 import org.apache.phoenix.schema.TableRef;
 import org.apache.phoenix.schema.ValueSchema.Field;
 import org.apache.phoenix.schema.stats.StatisticsCollectionRunTracker;
@@ -146,7 +145,6 @@ import org.apache.phoenix.util.StringUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Throwables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
@@ -986,87 +984,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
         }
     }
 
-    @Override
-    public void postCompact(final 
ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
-            final StoreFile resultFile, CompactionRequest request) throws 
IOException {
-        // If we're compacting all files, then delete markers are removed
-        // and we must permanently disable an index that needs to be
-        // partially rebuild because we're potentially losing the information
-        // we need to successfully rebuilt it.
-        if (request.isMajor()) {
-            // Compaction and split upcalls run with the effective user 
context of the requesting user.
-            // This will lead to failure of cross cluster RPC if the effective 
user is not
-            // the login user. Switch to the login user context to ensure we 
have the expected
-            // security context.
-            try {
-                UserGroupInformation.getLoginUser().doAs(new 
PrivilegedExceptionAction<Void>() {
-                    @Override
-                    public Void run() throws Exception {
-                        String fullTableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
-                        clearTsOnDisabledIndexes(fullTableName);
-                        return null;
-                    }
-                });
-            } catch (InterruptedException ie) {
-                Thread.interrupted();
-                throw new IOException(ie);
-            }
-        }
-    }
-
-    @VisibleForTesting
-    public void clearTsOnDisabledIndexes(final String fullTableName) {
-        try (PhoenixConnection conn =
-                
QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class))
 {
-            String baseTable = fullTableName;
-            PTable table = PhoenixRuntime.getTableNoCache(conn, baseTable);
-            List<PTable> indexes;
-            // if it's an index table, we just need to check if it's disabled
-            if (PTableType.INDEX.equals(table.getType())) {
-                indexes = Lists.newArrayList(table.getIndexes());
-                indexes.add(table);
-            } else {
-                // for a data table, check all its indexes
-                indexes = table.getIndexes();
-            }
-            // FIXME need handle views and indexes on views as well
-            // if any index is disabled, we won't have all the data for a 
rebuild after compaction
-            for (PTable index : indexes) {
-                if (index.getIndexDisableTimestamp() != 0) {
-                    try {
-                        logger.info(
-                            "Major compaction running while index on table is 
disabled.  Clearing index disable timestamp: "
-                                    + index);
-                        IndexUtil.updateIndexState(conn, 
index.getName().getString(),
-                            PIndexState.DISABLE, Long.valueOf(0L));
-                    } catch (SQLException e) {
-                        logger.warn(
-                            "Unable to permanently disable index " + 
index.getName().getString(),
-                            e);
-                    }
-                }
-            }
-        } catch (Exception e) {
-            if (e instanceof TableNotFoundException) {
-                logger.debug("Ignoring HBase table that is not a Phoenix 
table: " + fullTableName);
-                // non-Phoenix HBase tables won't be found, do nothing
-                return;
-            }
-            // If we can't reach the stats table, don't interrupt the normal
-            // compaction operation, just log a warning.
-            if (logger.isWarnEnabled()) {
-                logger.warn("Unable to permanently disable indexes being 
partially rebuild for "
-                        + fullTableName,
-                    e);
-            }
-        }
-    }
-
-    @VisibleForTesting
-    public void setCompactionConfig(Configuration compactionConfig) {
-        this.compactionConfig = compactionConfig;
-    }
-
     private static PTable deserializeTable(byte[] b) {
         try {
             PTableProtos.PTable ptableProto = PTableProtos.PTable.parseFrom(b);
@@ -1424,4 +1341,47 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
     protected boolean isRegionObserverFor(Scan scan) {
         return scan.getAttribute(BaseScannerRegionObserver.UNGROUPED_AGG) != 
null;
     }
+
+    @Override
+    public InternalScanner preCompactScannerOpen(final 
ObserverContext<RegionCoprocessorEnvironment> c,
+            final Store store, final List<? extends KeyValueScanner> scanners, 
ScanType scanType,
+            long earliestPutTs, final InternalScanner s, final 
CompactionRequest request) throws IOException {
+        // Compaction and split upcalls run with the effective user context of 
the requesting user.
+        // This will lead to failure of cross cluster RPC if the effective 
user is not
+        // the login user. Switch to the login user context to ensure we have 
the expected
+        // security context.
+        try {
+            return UserGroupInformation.getLoginUser().doAs(new 
PrivilegedExceptionAction<InternalScanner>() {
+                @Override
+                public InternalScanner run() throws Exception {
+                    // If the index is disabled, keep the deleted cells so the 
rebuild doesn't corrupt the index
+                    if (request.isMajor()) {
+                        String fullTableName = 
c.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();
+                            try (PhoenixConnection conn =
+                                    
QueryUtil.getConnectionOnServer(compactionConfig).unwrap(PhoenixConnection.class))
 {
+                            String baseTable = fullTableName;
+                            PTable table = 
PhoenixRuntime.getTableNoCache(conn, baseTable);
+                            List<PTable> indexes = 
PTableType.INDEX.equals(table.getType()) ? Lists.newArrayList(table) : 
table.getIndexes();
+                            // FIXME need to handle views and indexes on views 
as well
+                            for (PTable index : indexes) {
+                                if (index.getIndexDisableTimestamp() != 0) {
+                                    logger.info(
+                                        "Modifying major compaction scanner to 
retain deleted cells for a table with disabled index: "
+                                                + baseTable);
+                                    Scan scan = new Scan();
+                                    scan.setMaxVersions();
+                                    return new StoreScanner(store, 
store.getScanInfo(), scan, scanners,
+                                        ScanType.COMPACT_RETAIN_DELETES, 
store.getSmallestReadPoint(),
+                                        HConstants.OLDEST_TIMESTAMP);
+                                }
+                            }
+                        }
+                    }
+                    return s;
+                }
+            });
+        } catch (InterruptedException e) {
+            throw new IOException(e);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/178405d7/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
----------------------------------------------------------------------
diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java 
b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
index 7be2b6f..12c8f96 100644
--- a/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
+++ b/phoenix-core/src/test/java/org/apache/phoenix/util/TestUtil.java
@@ -853,6 +853,25 @@ public class TestUtil {
         System.out.println("-----------------------------------------------");
     }
 
+    public static int getRawRowCount(HTableInterface table) throws IOException 
{
+        Scan s = new Scan();
+        s.setRaw(true);;
+        s.setMaxVersions();
+        int rows = 0;
+        try (ResultScanner scanner = table.getScanner(s)) {
+            Result result = null;
+            while ((result = scanner.next()) != null) {
+                rows++;
+                CellScanner cellScanner = result.cellScanner();
+                Cell current = null;
+                while (cellScanner.advance()) {
+                    current = cellScanner.current();
+                }
+            }
+        }
+        return rows;
+    }
+
     public static void dumpIndexStatus(Connection conn, String indexName) 
throws IOException, SQLException {
         try (HTableInterface table = 
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME_BYTES))
 { 
             System.out.println("************ dumping index status for " + 
indexName + " **************");

Reply via email to