This is an automated email from the ASF dual-hosted git repository.
rajeshbabu pushed a commit to branch 4.x-HBase-1.4
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x-HBase-1.4 by this push:
new 0c15dcc PHOENIX-5308 Unable to run the some end2end tests in real
cluster mainly the once using accessing hbase internals from minihbasecluster
or custom coprocessors(Rajeshbabu)
0c15dcc is described below
commit 0c15dccb73857d8197af317bccba6d283124b8f5
Author: Rajeshbabu Chintaguntla <Rajeshbabu Chintaguntla>
AuthorDate: Mon Jun 17 15:41:08 2019 +0530
PHOENIX-5308 Unable to run the some end2end tests in real cluster mainly
the once using accessing hbase internals from minihbasecluster or custom
coprocessors(Rajeshbabu)
---
.../end2end/ConcurrentMutationsExtendedIT.java | 404 +++++++++++++++++++++
.../phoenix/end2end/ConcurrentMutationsIT.java | 343 +----------------
.../end2end/index/MutableIndexExtendedIT.java | 184 ++++++++++
.../phoenix/end2end/index/MutableIndexIT.java | 176 ---------
.../phoenix/end2end/join/HashJoinCacheIT.java | 3 +
5 files changed, 592 insertions(+), 518 deletions(-)
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
new file mode 100644
index 0000000..571961d
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java
@@ -0,0 +1,404 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver;
+import org.apache.hadoop.hbase.regionserver.MiniBatchOperationInProgress;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.util.*;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import java.io.IOException;
+import java.sql.*;
+import java.util.Properties;
+import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.*;
+
+@RunWith(RunUntilFailure.class) @Category(NeedsOwnMiniClusterTest.class)
+public class ConcurrentMutationsExtendedIT extends ParallelStatsDisabledIT {
+
+ private static final Random RAND = new Random(5);
+ private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_";
+ private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_";
+ private static final int ROW_LOCK_WAIT_TIME = 10000;
+
+ private final Object lock = new Object();
+
+ @Test
+ public void testSynchronousDeletesAndUpsertValues() throws Exception {
+ final String tableName = generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER,
CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0");
+ TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName + "(v1)");
+ final CountDownLatch doneSignal = new CountDownLatch(2);
+ Runnable r1 = new Runnable() {
+
+ @Override public void run() {
+ try {
+ Properties props =
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(20);
+ synchronized (lock) {
+ PhoenixConnection conn = null;
+ try {
+ conn =
+ DriverManager.getConnection(getUrl(),
props)
+
.unwrap(PhoenixConnection.class);
+ conn.setAutoCommit(true);
+ conn.createStatement().execute("DELETE FROM "
+ tableName);
+ } finally {
+ if (conn != null) conn.close();
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Runnable r2 = new Runnable() {
+
+ @Override public void run() {
+ try {
+ Properties props =
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
+ int nRowsToUpsert = 1000;
+ for (int i = 0; i < nRowsToUpsert; i++) {
+ synchronized (lock) {
+ PhoenixConnection conn = null;
+ try {
+ conn =
+ DriverManager.getConnection(getUrl(),
props)
+
.unwrap(PhoenixConnection.class);
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES
(" + (i % 10)
+ + ", 0, 1)");
+ if ((i % 20) == 0 || i == nRowsToUpsert - 1) {
+ conn.commit();
+ }
+ } finally {
+ if (conn != null) conn.close();
+ }
+ }
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Thread t1 = new Thread(r1);
+ t1.start();
+ Thread t2 = new Thread(r2);
+ t2.start();
+
+ doneSignal.await(60, TimeUnit.SECONDS);
+ IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+ }
+
+ @Test
+ public void testConcurrentDeletesAndUpsertValues() throws Exception {
+ final String tableName = generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER,
CONSTRAINT pk PRIMARY KEY (k1,k2))");
+ TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName + "(v1)");
+ final CountDownLatch doneSignal = new CountDownLatch(2);
+ Runnable r1 = new Runnable() {
+
+ @Override public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.setAutoCommit(true);
+ for (int i = 0; i < 50; i++) {
+ Thread.sleep(20);
+ conn.createStatement().execute("DELETE FROM " +
tableName);
+ }
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ Thread.interrupted();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Runnable r2 = new Runnable() {
+
+ @Override public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ for (int i = 0; i < 1000; i++) {
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES (" + (i
% 10) + ", 0, 1)");
+ if ((i % 20) == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Thread t1 = new Thread(r1);
+ t1.start();
+ Thread t2 = new Thread(r2);
+ t2.start();
+
+ doneSignal.await(60, TimeUnit.SECONDS);
+ IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
+ }
+
+ @Test @Repeat(5)
+ public void testConcurrentUpserts() throws Exception {
+ int nThreads = 4;
+ final int batchSize = 200;
+ final int nRows = 51;
+ final int nIndexValues = 23;
+ final String tableName = generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + "(k1 INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER,
CONSTRAINT pk PRIMARY KEY (k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
+ TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName + "(v1)");
+ final CountDownLatch doneSignal = new CountDownLatch(nThreads);
+ Runnable[] runnables = new Runnable[nThreads];
+ for (int i = 0; i < nThreads; i++) {
+ runnables[i] = new Runnable() {
+
+ @Override public void run() {
+ try {
+ Connection conn =
DriverManager.getConnection(getUrl());
+ for (int i = 0; i < 10000; i++) {
+ boolean isNull = RAND.nextBoolean();
+ int randInt = RAND.nextInt() % nIndexValues;
+ conn.createStatement().execute(
+ "UPSERT INTO " + tableName + " VALUES (" +
(i % nRows) + ", 0, "
+ + (isNull ? null : randInt) + ")");
+ if ((i % batchSize) == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ } catch (SQLException e) {
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ }
+ for (int i = 0; i < nThreads; i++) {
+ Thread t = new Thread(runnables[i]);
+ t.start();
+ }
+
+ assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName,
indexName);
+ assertEquals(nRows, actualRowCount);
+ }
+
+ @Test
+ public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
+ final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + "(k VARCHAR PRIMARY KEY, v INTEGER) COLUMN_ENCODED_BYTES =
0");
+ TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName + "(v)");
+ final CountDownLatch doneSignal = new CountDownLatch(2);
+ final String[] failedMsg = new String[1];
+ Runnable r1 = new Runnable() {
+
+ @Override public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES
('foo',0)");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES
('foo',1)");
+ conn.commit();
+ } catch (Exception e) {
+ failedMsg[0] = e.getMessage();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Runnable r2 = new Runnable() {
+
+ @Override public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES
('foo',2)");
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES
('foo',3)");
+ conn.commit();
+ } catch (Exception e) {
+ failedMsg[0] = e.getMessage();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Thread t1 = new Thread(r1);
+ t1.start();
+ Thread t2 = new Thread(r2);
+ t2.start();
+
+ doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS);
+ assertNull(failedMsg[0], failedMsg[0]);
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName,
indexName);
+ assertEquals(1, actualRowCount);
+ }
+
+ @Test
+ public void testLockUntilMVCCAdvanced() throws Exception {
+ final String tableName = MVCC_LOCK_TEST_TABLE_PREFIX +
generateUniqueName();
+ final String indexName = generateUniqueName();
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement().execute("CREATE TABLE " + tableName
+ + "(k VARCHAR PRIMARY KEY, v INTEGER) COLUMN_ENCODED_BYTES =
0");
+ conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName + "(v,k)");
+ conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('foo',0)");
+ conn.commit();
+ TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
+ final CountDownLatch doneSignal = new CountDownLatch(2);
+ final String[] failedMsg = new String[1];
+ Runnable r1 = new Runnable() {
+
+ @Override public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES
('foo',1)");
+ conn.commit();
+ } catch (Exception e) {
+ failedMsg[0] = e.getMessage();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Runnable r2 = new Runnable() {
+
+ @Override public void run() {
+ try {
+ Connection conn = DriverManager.getConnection(getUrl());
+ conn.createStatement()
+ .execute("UPSERT INTO " + tableName + " VALUES
('foo',2)");
+ conn.commit();
+ } catch (Exception e) {
+ failedMsg[0] = e.getMessage();
+ throw new RuntimeException(e);
+ } finally {
+ doneSignal.countDown();
+ }
+ }
+
+ };
+ Thread t1 = new Thread(r1);
+ t1.start();
+ Thread t2 = new Thread(r2);
+ t2.start();
+
+ doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS);
+ long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName,
indexName);
+ assertEquals(1, actualRowCount);
+ }
+
+ public static class DelayingRegionObserver extends SimpleRegionObserver {
+ private volatile boolean lockedTableRow;
+
+ @Override public void
postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws
IOException {
+ try {
+ String tableName =
c.getEnvironment().getRegionInfo().getTable().getNameAsString();
+ if (tableName.startsWith(MVCC_LOCK_TEST_TABLE_PREFIX)) {
+ Thread.sleep(ROW_LOCK_WAIT_TIME
+ / 2); // Wait long enough that they'll both have
the same mvcc
+ }
+ } catch (InterruptedException e) {
+ }
+ }
+
+ @Override public void
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
+ MiniBatchOperationInProgress<Mutation> miniBatchOp) throws
HBaseIOException {
+ try {
+ String tableName =
c.getEnvironment().getRegionInfo().getTable().getNameAsString();
+ if (tableName.startsWith(LOCK_TEST_TABLE_PREFIX)) {
+ if (lockedTableRow) {
+ throw new DoNotRetryIOException(
+ "Expected lock in preBatchMutate to be
exclusive, but it wasn't for row "
+ + Bytes
+
.toStringBinary(miniBatchOp.getOperation(0).getRow()));
+ }
+ lockedTableRow = true;
+ Thread.sleep(ROW_LOCK_WAIT_TIME + 2000);
+ }
+ Thread.sleep(Math.abs(RAND.nextInt()) % 10);
+ } catch (InterruptedException e) {
+ } finally {
+ lockedTableRow = false;
+ }
+
+ }
+ }
+
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
index d1f30c6..f312df0 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsIT.java
@@ -56,12 +56,7 @@ import org.junit.runner.RunWith;
@RunWith(RunUntilFailure.class)
public class ConcurrentMutationsIT extends ParallelStatsDisabledIT {
- private static final Random RAND = new Random(5);
- private static final String MVCC_LOCK_TEST_TABLE_PREFIX = "MVCCLOCKTEST_";
- private static final String LOCK_TEST_TABLE_PREFIX = "LOCKTEST_";
- private static final int ROW_LOCK_WAIT_TIME = 10000;
-
- private final Object lock = new Object();
+
private static class MyClock extends EnvironmentEdge {
public volatile long time;
@@ -77,342 +72,6 @@ public class ConcurrentMutationsIT extends
ParallelStatsDisabledIT {
}
@Test
- public void testSynchronousDeletesAndUpsertValues() throws Exception {
- final String tableName = generateUniqueName();
- final String indexName = generateUniqueName();
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute("CREATE TABLE " + tableName + "(k1
INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY
(k1,k2)) COLUMN_ENCODED_BYTES = 0");
- TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName + "(v1)");
- final CountDownLatch doneSignal = new CountDownLatch(2);
- Runnable r1 = new Runnable() {
-
- @Override
- public void run() {
- try {
- Properties props =
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
- for (int i = 0; i < 50; i++) {
- Thread.sleep(20);
- synchronized (lock) {
- PhoenixConnection conn = null;
- try {
- conn = DriverManager.getConnection(getUrl(),
props).unwrap(PhoenixConnection.class);
- conn.setAutoCommit(true);
- conn.createStatement().execute("DELETE FROM "
+ tableName);
- } finally {
- if (conn != null) conn.close();
- }
- }
- }
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw new RuntimeException(e);
- } finally {
- doneSignal.countDown();
- }
- }
-
- };
- Runnable r2 = new Runnable() {
-
- @Override
- public void run() {
- try {
- Properties props =
PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES);
- int nRowsToUpsert = 1000;
- for (int i = 0; i < nRowsToUpsert; i++) {
- synchronized(lock) {
- PhoenixConnection conn = null;
- try {
- conn = DriverManager.getConnection(getUrl(),
props).unwrap(PhoenixConnection.class);
- conn.createStatement().execute("UPSERT INTO "
+ tableName + " VALUES (" + (i % 10) + ", 0, 1)");
- if ((i % 20) == 0 || i == nRowsToUpsert-1 ) {
- conn.commit();
- }
- } finally {
- if (conn != null) conn.close();
- }
- }
- }
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } finally {
- doneSignal.countDown();
- }
- }
-
- };
- Thread t1 = new Thread(r1);
- t1.start();
- Thread t2 = new Thread(r2);
- t2.start();
-
- doneSignal.await(60, TimeUnit.SECONDS);
- IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
- }
-
- @Test
- public void testConcurrentDeletesAndUpsertValues() throws Exception {
- final String tableName = generateUniqueName();
- final String indexName = generateUniqueName();
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute("CREATE TABLE " + tableName + "(k1
INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY
(k1,k2))");
- TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName + "(v1)");
- final CountDownLatch doneSignal = new CountDownLatch(2);
- Runnable r1 = new Runnable() {
-
- @Override
- public void run() {
- try {
- Connection conn = DriverManager.getConnection(getUrl());
- conn.setAutoCommit(true);
- for (int i = 0; i < 50; i++) {
- Thread.sleep(20);
- conn.createStatement().execute("DELETE FROM " +
tableName);
- }
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } catch (InterruptedException e) {
- Thread.interrupted();
- throw new RuntimeException(e);
- } finally {
- doneSignal.countDown();
- }
- }
-
- };
- Runnable r2 = new Runnable() {
-
- @Override
- public void run() {
- try {
- Connection conn = DriverManager.getConnection(getUrl());
- for (int i = 0; i < 1000; i++) {
- conn.createStatement().execute("UPSERT INTO " +
tableName + " VALUES (" + (i % 10) + ", 0, 1)");
- if ((i % 20) == 0) {
- conn.commit();
- }
- }
- conn.commit();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } finally {
- doneSignal.countDown();
- }
- }
-
- };
- Thread t1 = new Thread(r1);
- t1.start();
- Thread t2 = new Thread(r2);
- t2.start();
-
- doneSignal.await(60, TimeUnit.SECONDS);
- IndexScrutiny.scrutinizeIndex(conn, tableName, indexName);
- }
-
- @Test
- @Repeat(5)
- public void testConcurrentUpserts() throws Exception {
- int nThreads = 4;
- final int batchSize = 200;
- final int nRows = 51;
- final int nIndexValues = 23;
- final String tableName = generateUniqueName();
- final String indexName = generateUniqueName();
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute("CREATE TABLE " + tableName + "(k1
INTEGER NOT NULL, k2 INTEGER NOT NULL, v1 INTEGER, CONSTRAINT pk PRIMARY KEY
(k1,k2)) COLUMN_ENCODED_BYTES = 0, VERSIONS=1");
- TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName + "(v1)");
- final CountDownLatch doneSignal = new CountDownLatch(nThreads);
- Runnable[] runnables = new Runnable[nThreads];
- for (int i = 0; i < nThreads; i++) {
- runnables[i] = new Runnable() {
-
- @Override
- public void run() {
- try {
- Connection conn = DriverManager.getConnection(getUrl());
- for (int i = 0; i < 10000; i++) {
- boolean isNull = RAND.nextBoolean();
- int randInt = RAND.nextInt() % nIndexValues;
- conn.createStatement().execute("UPSERT INTO " +
tableName + " VALUES (" + (i % nRows) + ", 0, " + (isNull ? null : randInt) +
")");
- if ((i % batchSize) == 0) {
- conn.commit();
- }
- }
- conn.commit();
- } catch (SQLException e) {
- throw new RuntimeException(e);
- } finally {
- doneSignal.countDown();
- }
- }
-
- };
- }
- for (int i = 0; i < nThreads; i++) {
- Thread t = new Thread(runnables[i]);
- t.start();
- }
-
- assertTrue("Ran out of time", doneSignal.await(120, TimeUnit.SECONDS));
- long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName,
indexName);
- assertEquals(nRows, actualRowCount);
- }
-
- @Test
- public void testRowLockDuringPreBatchMutateWhenIndexed() throws Exception {
- final String tableName = LOCK_TEST_TABLE_PREFIX + generateUniqueName();
- final String indexName = generateUniqueName();
- Connection conn = DriverManager.getConnection(getUrl());
-
- conn.createStatement().execute("CREATE TABLE " + tableName + "(k
VARCHAR PRIMARY KEY, v INTEGER) COLUMN_ENCODED_BYTES = 0");
- TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName + "(v)");
- final CountDownLatch doneSignal = new CountDownLatch(2);
- final String[] failedMsg = new String[1];
- Runnable r1 = new Runnable() {
-
- @Override
- public void run() {
- try {
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute("UPSERT INTO " + tableName
+ " VALUES ('foo',0)");
- conn.createStatement().execute("UPSERT INTO " + tableName
+ " VALUES ('foo',1)");
- conn.commit();
- } catch (Exception e) {
- failedMsg[0] = e.getMessage();
- throw new RuntimeException(e);
- } finally {
- doneSignal.countDown();
- }
- }
-
- };
- Runnable r2 = new Runnable() {
-
- @Override
- public void run() {
- try {
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute("UPSERT INTO " + tableName
+ " VALUES ('foo',2)");
- conn.createStatement().execute("UPSERT INTO " + tableName
+ " VALUES ('foo',3)");
- conn.commit();
- } catch (Exception e) {
- failedMsg[0] = e.getMessage();
- throw new RuntimeException(e);
- } finally {
- doneSignal.countDown();
- }
- }
-
- };
- Thread t1 = new Thread(r1);
- t1.start();
- Thread t2 = new Thread(r2);
- t2.start();
-
- doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS);
- assertNull(failedMsg[0], failedMsg[0]);
- long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName,
indexName);
- assertEquals(1, actualRowCount);
- }
-
- @Test
- public void testLockUntilMVCCAdvanced() throws Exception {
- final String tableName = MVCC_LOCK_TEST_TABLE_PREFIX +
generateUniqueName();
- final String indexName = generateUniqueName();
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute("CREATE TABLE " + tableName + "(k
VARCHAR PRIMARY KEY, v INTEGER) COLUMN_ENCODED_BYTES = 0");
- conn.createStatement().execute("CREATE INDEX " + indexName + " ON " +
tableName + "(v,k)");
- conn.createStatement().execute("UPSERT INTO " + tableName + " VALUES
('foo',0)");
- conn.commit();
- TestUtil.addCoprocessor(conn, tableName, DelayingRegionObserver.class);
- final CountDownLatch doneSignal = new CountDownLatch(2);
- final String[] failedMsg = new String[1];
- Runnable r1 = new Runnable() {
-
- @Override
- public void run() {
- try {
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute("UPSERT INTO " + tableName
+ " VALUES ('foo',1)");
- conn.commit();
- } catch (Exception e) {
- failedMsg[0] = e.getMessage();
- throw new RuntimeException(e);
- } finally {
- doneSignal.countDown();
- }
- }
-
- };
- Runnable r2 = new Runnable() {
-
- @Override
- public void run() {
- try {
- Connection conn = DriverManager.getConnection(getUrl());
- conn.createStatement().execute("UPSERT INTO " + tableName
+ " VALUES ('foo',2)");
- conn.commit();
- } catch (Exception e) {
- failedMsg[0] = e.getMessage();
- throw new RuntimeException(e);
- } finally {
- doneSignal.countDown();
- }
- }
-
- };
- Thread t1 = new Thread(r1);
- t1.start();
- Thread t2 = new Thread(r2);
- t2.start();
-
- doneSignal.await(ROW_LOCK_WAIT_TIME + 5000, TimeUnit.SECONDS);
- long actualRowCount = IndexScrutiny.scrutinizeIndex(conn, tableName,
indexName);
- assertEquals(1, actualRowCount);
- }
-
- public static class DelayingRegionObserver extends SimpleRegionObserver {
- private volatile boolean lockedTableRow;
-
- @Override
- public void
postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
- try {
- String tableName =
c.getEnvironment().getRegionInfo().getTable().getNameAsString();
- if (tableName.startsWith(MVCC_LOCK_TEST_TABLE_PREFIX)) {
- Thread.sleep(ROW_LOCK_WAIT_TIME/2); // Wait long enough
that they'll both have the same mvcc
- }
- } catch (InterruptedException e) {
- }
- }
-
- @Override
- public void
preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws HBaseIOException {
- try {
- String tableName =
c.getEnvironment().getRegionInfo().getTable().getNameAsString();
- if (tableName.startsWith(LOCK_TEST_TABLE_PREFIX)) {
- if (lockedTableRow) {
- throw new DoNotRetryIOException("Expected lock in
preBatchMutate to be exclusive, but it wasn't for row " +
Bytes.toStringBinary(miniBatchOp.getOperation(0).getRow()));
- }
- lockedTableRow = true;
- Thread.sleep(ROW_LOCK_WAIT_TIME + 2000);
- }
- Thread.sleep(Math.abs(RAND.nextInt()) % 10);
- } catch (InterruptedException e) {
- } finally {
- lockedTableRow = false;
- }
-
- }
- }
-
- @Test
@Ignore("PHOENIX-4058 Generate correct index updates when DeleteColumn
processed before Put with same timestamp")
public void testSetIndexedColumnToNullAndValueAtSameTS() throws Exception {
try {
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexExtendedIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexExtendedIT.java
new file mode 100644
index 0000000..8676da0
--- /dev/null
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexExtendedIT.java
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end.index;
+
+import jline.internal.Log;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
+import org.apache.phoenix.query.BaseTest;
+import org.apache.phoenix.query.ConnectionQueryServices;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.schema.PIndexState;
+import org.apache.phoenix.util.*;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.sql.Connection;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES;
+import static org.junit.Assert.*;
+
+@RunWith(Parameterized.class)
+@Category(NeedsOwnMiniClusterTest.class)
+public class MutableIndexExtendedIT extends ParallelStatsDisabledIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MutableIndexIT.class);
+
+ protected final boolean localIndex;
+ protected final String tableDDLOptions;
+
+ public MutableIndexExtendedIT(Boolean localIndex, String txProvider,
Boolean columnEncoded) {
+ this.localIndex = localIndex;
+ StringBuilder optionBuilder = new StringBuilder();
+ if (txProvider != null) {
+ optionBuilder
+ .append("TRANSACTIONAL=true," +
PhoenixDatabaseMetaData.TRANSACTION_PROVIDER
+ + "='" + txProvider + "'");
+ }
+ if (!columnEncoded) {
+ if (optionBuilder.length() != 0) optionBuilder.append(",");
+ optionBuilder.append("COLUMN_ENCODED_BYTES=0");
+ }
+ this.tableDDLOptions = optionBuilder.toString();
+ }
+
+ private static Connection getConnection(Properties props) throws
SQLException {
+
props.setProperty(QueryServices.INDEX_MUTATE_BATCH_SIZE_THRESHOLD_ATTRIB,
+ Integer.toString(1));
+ Connection conn = DriverManager.getConnection(getUrl(), props);
+ return conn;
+ }
+
+ protected static Connection getConnection() throws SQLException {
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ return getConnection(props);
+ }
+
+ @Parameterized.Parameters(name =
"MutableIndexExtendedIT_localIndex={0},transactionProvider={1},columnEncoded={2}")
+ // name is used by failsafe as file name in reports
+ public static Collection<Object[]> data() {
+ return TestUtil.filterTxParamData(Arrays.asList(
+ new Object[][] { { false, null, false }, { false, null, true },
+ { false, "TEPHRA", false }, { false, "TEPHRA", true },
+ { false, "OMID", false }, { true, null, false }, {
true, null, true },
+ { true, "TEPHRA", false }, { true, "TEPHRA", true },
}), 1);
+ }
+
+ @Test
+ public void testIndexHalfStoreFileReader() throws Exception {
+ if (!localIndex)
+ return;
+
+ Connection conn1 = getConnection();
+ ConnectionQueryServices connectionQueryServices =
driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES);
+ HBaseAdmin admin = connectionQueryServices.getAdmin();
+ String tableName = "TBL_" + generateUniqueName();
+ String indexName = "IDX_" + generateUniqueName();
+ createBaseTable(conn1, tableName, "('e')");
+ conn1.createStatement().execute("CREATE "+(localIndex?"LOCAL":"")+"
INDEX " + indexName
+ + " ON " + tableName + "(v1)" + (localIndex?"":" SPLIT ON
('e')"));
+ conn1.createStatement().execute("UPSERT INTO "+tableName+"
values('b',1,2,4,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+"
values('f',1,2,3,'z')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+"
values('j',2,4,2,'a')");
+ conn1.createStatement().execute("UPSERT INTO "+tableName+"
values('q',3,1,1,'c')");
+ conn1.commit();
+
+
+ String query = "SELECT count(*) FROM " + tableName +" where v1<='z'";
+ ResultSet rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1));
+
+ TableName indexTable = TableName.valueOf(localIndex?tableName:
indexName);
+ admin.flush(indexTable);
+ boolean merged = false;
+ HTableInterface table =
connectionQueryServices.getTable(indexTable.getName());
+ // merge regions until 1 left
+ long numRegions = 0;
+ while (true) {
+ rs = conn1.createStatement().executeQuery(query);
+ assertTrue(rs.next());
+ assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes
instead of 4, duplicate results?
+ try {
+ List<HRegionInfo> indexRegions =
admin.getTableRegions(indexTable);
+ numRegions = indexRegions.size();
+ if (numRegions==1) {
+ break;
+ }
+ if(!merged) {
+ List<HRegionInfo> regions =
+ admin.getTableRegions(indexTable);
+ LOGGER.info("Merging: " + regions.size());
+ admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
+ regions.get(1).getEncodedNameAsBytes(), false);
+ merged = true;
+ Threads.sleep(10000);
+ }
+ } catch (Exception ex) {
+ LOGGER.info(ex.getMessage());
+ }
+ long waitStartTime = System.currentTimeMillis();
+ // wait until merge happened
+ while (System.currentTimeMillis() - waitStartTime < 10000) {
+ List<HRegionInfo> regions = admin.getTableRegions(indexTable);
+ LOGGER.info("Waiting:" + regions.size());
+ if (regions.size() < numRegions) {
+ break;
+ }
+ Threads.sleep(1000);
+ }
+ SnapshotTestingUtils.waitForTableToBeOnline(BaseTest.getUtility(),
indexTable);
+ assertTrue("Index table should be online ",
admin.isTableAvailable(indexTable));
+ }
+ }
+
+ protected void createBaseTable(Connection conn, String tableName, String
splits)
+ throws SQLException {
+ String ddl =
+ "CREATE TABLE " + tableName + " (t_id VARCHAR NOT NULL,\n"
+ + "k1 INTEGER NOT NULL,\n" + "k2 INTEGER NOT NULL,\n"
+ "k3 INTEGER,\n"
+ + "v1 VARCHAR,\n" + "CONSTRAINT pk PRIMARY KEY (t_id,
k1, k2))\n" + (
+ tableDDLOptions != null ?
+ tableDDLOptions :
+ "") + (splits != null ? (" split on " +
splits) : "");
+ conn.createStatement().execute(ddl);
+ }
+}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
index 8e11af6..d6e467a 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/index/MutableIndexIT.java
@@ -39,29 +39,15 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.HTableInterface;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HStore;
-import org.apache.hadoop.hbase.snapshot.SnapshotTestingUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
-import org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver;
import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
-import org.apache.phoenix.end2end.PartialScannerResultsDisabledIT;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixDatabaseMetaData;
-import org.apache.phoenix.query.BaseTest;
-import org.apache.phoenix.query.ConnectionQueryServices;
-import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
-import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.util.ByteUtil;
-import org.apache.phoenix.util.EnvironmentEdgeManager;
import org.apache.phoenix.util.IndexScrutiny;
-import org.apache.phoenix.util.IndexUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.QueryUtil;
@@ -71,15 +57,12 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import com.google.common.primitives.Doubles;
@RunWith(Parameterized.class)
public class MutableIndexIT extends ParallelStatsDisabledIT {
- private static final Logger LOGGER =
LoggerFactory.getLogger(MutableIndexIT.class);
protected final boolean localIndex;
private final String tableDDLOptions;
@@ -652,72 +635,6 @@ public class MutableIndexIT extends
ParallelStatsDisabledIT {
"CREATE " + (localIndex ? "LOCAL" : "")+" INDEX " + indexName + "
ON " + tableName + "(v1"+(isReverse?" DESC":"")+") include (k3)");
}
- @Test
- public void testIndexHalfStoreFileReader() throws Exception {
- if (!localIndex)
- return;
-
- Connection conn1 = getConnection();
- ConnectionQueryServices connectionQueryServices =
driver.getConnectionQueryServices(getUrl(), TestUtil.TEST_PROPERTIES);
- HBaseAdmin admin = connectionQueryServices.getAdmin();
- String tableName = "TBL_" + generateUniqueName();
- String indexName = "IDX_" + generateUniqueName();
- createBaseTable(conn1, tableName, "('e')");
- conn1.createStatement().execute("CREATE "+(localIndex?"LOCAL":"")+"
INDEX " + indexName + " ON " + tableName + "(v1)" + (localIndex?"":" SPLIT ON
('e')"));
- conn1.createStatement().execute("UPSERT INTO "+tableName+"
values('b',1,2,4,'z')");
- conn1.createStatement().execute("UPSERT INTO "+tableName+"
values('f',1,2,3,'z')");
- conn1.createStatement().execute("UPSERT INTO "+tableName+"
values('j',2,4,2,'a')");
- conn1.createStatement().execute("UPSERT INTO "+tableName+"
values('q',3,1,1,'c')");
- conn1.commit();
-
-
- String query = "SELECT count(*) FROM " + tableName +" where v1<='z'";
- ResultSet rs = conn1.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals(4, rs.getInt(1));
-
- TableName indexTable = TableName.valueOf(localIndex?tableName:
indexName);
- admin.flush(indexTable);
- boolean merged = false;
- HTableInterface table =
connectionQueryServices.getTable(indexTable.getName());
- // merge regions until 1 left
- long numRegions = 0;
- while (true) {
- rs = conn1.createStatement().executeQuery(query);
- assertTrue(rs.next());
- assertEquals(4, rs.getInt(1)); //TODO this returns 5 sometimes
instead of 4, duplicate results?
- try {
- List<HRegionInfo> indexRegions = admin.getTableRegions(indexTable);
- numRegions = indexRegions.size();
- if (numRegions==1) {
- break;
- }
- if(!merged) {
- List<HRegionInfo> regions =
- admin.getTableRegions(indexTable);
- LOGGER.info("Merging: " + regions.size());
- admin.mergeRegions(regions.get(0).getEncodedNameAsBytes(),
- regions.get(1).getEncodedNameAsBytes(), false);
- merged = true;
- Threads.sleep(10000);
- }
- } catch (Exception ex) {
- LOGGER.info(ex.getMessage());
- }
- long waitStartTime = System.currentTimeMillis();
- // wait until merge happened
- while (System.currentTimeMillis() - waitStartTime < 10000) {
- List<HRegionInfo> regions = admin.getTableRegions(indexTable);
- LOGGER.info("Waiting:" + regions.size());
- if (regions.size() < numRegions) {
- break;
- }
- Threads.sleep(1000);
- }
- SnapshotTestingUtils.waitForTableToBeOnline(BaseTest.getUtility(),
indexTable);
- assertTrue("Index table should be online ",
admin.isTableAvailable(indexTable));
- }
- }
private List<HRegionInfo> splitDuringScan(Connection conn1, String
tableName, String indexName, String[] strings, HBaseAdmin admin, boolean
isReverse)
@@ -832,99 +749,6 @@ public class MutableIndexIT extends
ParallelStatsDisabledIT {
}
}
- // Tests that if major compaction is run on a table with a disabled index,
- // deleted cells are kept
- // TODO: Move to a different test class?
- @Test
- public void testCompactDisabledIndex() throws Exception {
- if (localIndex || tableDDLOptions.contains("TRANSACTIONAL=true"))
- return;
-
- try (Connection conn = getConnection()) {
- String schemaName = generateUniqueName();
- String dataTableName = generateUniqueName() + "_DATA";
- String dataTableFullName = SchemaUtil.getTableName(schemaName,
dataTableName);
- String indexTableName = generateUniqueName() + "_IDX";
- String indexTableFullName = SchemaUtil.getTableName(schemaName,
indexTableName);
- conn.createStatement().execute(
- String.format(PartialScannerResultsDisabledIT.TEST_TABLE_DDL,
dataTableFullName));
-
conn.createStatement().execute(String.format(PartialScannerResultsDisabledIT.INDEX_1_DDL,
- indexTableName, dataTableFullName));
-
- //insert a row, and delete it
- PartialScannerResultsDisabledIT.writeSingleBatch(conn, 1, 1,
dataTableFullName);
- conn.createStatement().execute("DELETE FROM " + dataTableFullName);
- conn.commit();
-
- // disable the index, simulating an index write failure
- PhoenixConnection pConn = conn.unwrap(PhoenixConnection.class);
- IndexUtil.updateIndexState(pConn, indexTableFullName,
PIndexState.DISABLE,
- EnvironmentEdgeManager.currentTimeMillis());
-
- // major compaction should not remove the deleted row
- List<HRegion> regions =
getUtility().getHBaseCluster().getRegions(TableName.valueOf(dataTableFullName));
- HRegion hRegion = regions.get(0);
- hRegion.flush(true);
- HStore store = (HStore)
hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
- store.triggerMajorCompaction();
- store.compactRecentForTestingAssumingDefaultPolicy(1);
- HTableInterface dataHTI =
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName));
- assertEquals(1, TestUtil.getRawRowCount(dataHTI));
-
- // reenable the index
- IndexUtil.updateIndexState(pConn, indexTableFullName,
PIndexState.INACTIVE,
- EnvironmentEdgeManager.currentTimeMillis());
- IndexUtil.updateIndexState(pConn, indexTableFullName,
PIndexState.ACTIVE, 0L);
-
- // now major compaction should remove the deleted row
- store.triggerMajorCompaction();
- store.compactRecentForTestingAssumingDefaultPolicy(1);
- dataHTI =
conn.unwrap(PhoenixConnection.class).getQueryServices().getTable(Bytes.toBytes(dataTableFullName));
- assertEquals(0, TestUtil.getRawRowCount(dataHTI));
- }
- }
-
- // some tables (e.g. indexes on views) have UngroupedAgg coproc loaded, but
don't have a
- // corresponding row in syscat. This tests that compaction isn't blocked
- // TODO: Move to a different test class?
- @Test(timeout=120000)
- public void testCompactNonPhoenixTable() throws Exception {
- if (localIndex || tableDDLOptions.contains("TRANSACTIONAL=true"))
- return;
-
- try (Connection conn = getConnection()) {
- // create a vanilla HBase table (non-Phoenix)
- String randomTable = generateUniqueName();
- TableName hbaseTN = TableName.valueOf(randomTable);
- byte[] famBytes = Bytes.toBytes("fam");
- HTable hTable = getUtility().createTable(hbaseTN, famBytes);
- TestUtil.addCoprocessor(conn, randomTable,
UngroupedAggregateRegionObserver.class);
- Put put = new Put(Bytes.toBytes("row"));
- byte[] value = new byte[1];
- Bytes.random(value);
- put.add(famBytes, Bytes.toBytes("colQ"), value);
- hTable.put(put);
- hTable.flushCommits();
-
- // major compaction shouldn't cause a timeout or RS abort
- List<HRegion> regions =
getUtility().getHBaseCluster().getRegions(hbaseTN);
- HRegion hRegion = regions.get(0);
- hRegion.flush(true);
- HStore store = (HStore) hRegion.getStore(famBytes);
- store.triggerMajorCompaction();
- store.compactRecentForTestingAssumingDefaultPolicy(1);
-
- // we should be able to compact syscat itself as well
- regions =
getUtility().getHBaseCluster().getRegions(TableName.valueOf(PhoenixDatabaseMetaData.SYSTEM_CATALOG_NAME));
- hRegion = regions.get(0);
- hRegion.flush(true);
- store = (HStore)
hRegion.getStore(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES);
- store.triggerMajorCompaction();
- store.compactRecentForTestingAssumingDefaultPolicy(1);
- }
- }
-
-
@Test
public void testUpsertingDeletedRowShouldGiveProperDataWithIndexes() throws
Exception {
testUpsertingDeletedRowShouldGiveProperDataWithIndexes(false);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
index c49c61f..d2414c6 100644
---
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
+++
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/join/HashJoinCacheIT.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.phoenix.cache.GlobalCache;
import org.apache.phoenix.cache.TenantCache;
import org.apache.phoenix.coprocessor.HashJoinCacheNotFoundException;
+import org.apache.phoenix.end2end.NeedsOwnMiniClusterTest;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.join.HashJoinInfo;
import org.apache.phoenix.query.QueryServices;
@@ -46,7 +47,9 @@ import org.apache.phoenix.util.PropertiesUtil;
import org.apache.phoenix.util.SchemaUtil;
import org.apache.phoenix.util.TestUtil;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
+@Category(NeedsOwnMiniClusterTest.class)
public class HashJoinCacheIT extends BaseJoinIT {
@Override