This is an automated email from the ASF dual-hosted git repository. kadir pushed a commit to branch 4.x in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/4.x by this push: new 4fc394c PHOENIX-6181 Addendum 4fc394c is described below commit 4fc394c3d9c92cbefa20800dd642df20524933a4 Author: Kadir Ozdemir <kozde...@salesforce.com> AuthorDate: Fri Oct 16 20:11:53 2020 -0700 PHOENIX-6181 Addendum --- .../end2end/ConcurrentMutationsExtendedIT.java | 6 ---- .../coprocessor/GlobalIndexRegionScanner.java | 5 +--- .../phoenix/coprocessor/IndexerRegionScanner.java | 5 ---- .../UngroupedAggregateRegionObserver.java | 35 +++++++++++----------- 4 files changed, 18 insertions(+), 33 deletions(-) diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java index 0a52c66..c2e0dc4 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ConcurrentMutationsExtendedIT.java @@ -22,7 +22,6 @@ import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.coprocessor.ObserverContext; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.SimpleRegionObserver; @@ -33,7 +32,6 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.mapreduce.index.IndexTool; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.util.*; -import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -45,7 +43,6 @@ import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; -import static org.apache.phoenix.mapreduce.PhoenixJobCounters.INPUT_RECORDS; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_INVALID_INDEX_ROW_COUNT; @@ -53,15 +50,12 @@ import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFT import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.AFTER_REBUILD_VALID_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_INVALID_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_BEYOND_MAXLOOKBACK_MISSING_INDEX_ROW_COUNT; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_EXPIRED_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_INVALID_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_MISSING_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_OLD_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNKNOWN_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_UNVERIFIED_INDEX_ROW_COUNT; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.BEFORE_REBUILD_VALID_INDEX_ROW_COUNT; import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.REBUILT_INDEX_ROW_COUNT; -import static org.apache.phoenix.mapreduce.index.PhoenixIndexToolJobCounters.SCANNED_DATA_ROW_COUNT; import static org.junit.Assert.*; @RunWith(RunUntilFailure.class) @Category(NeedsOwnMiniClusterTest.class) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java index e1eb9ff..5005339 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GlobalIndexRegionScanner.java @@ -66,7 +66,6 @@ import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.types.PVarbinary; import org.apache.phoenix.util.EnvironmentEdgeManager; import org.apache.phoenix.util.IndexUtil; -import org.apache.phoenix.util.ScanUtil; import org.apache.phoenix.util.ServerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -115,7 +114,6 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { public static final String PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = "phoenix.index.mr.log.beyond.max.lookback.errors"; public static final boolean DEFAULT_PHOENIX_INDEX_MR_LOG_BEYOND_MAX_LOOKBACK_ERRORS = false; - private static boolean ignoreIndexRebuildForTesting = false; protected final UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver; protected IndexTool.IndexDisableLoggingType disableLoggingVerifyType = IndexTool.IndexDisableLoggingType.NONE; @@ -986,8 +984,7 @@ public abstract class GlobalIndexRegionScanner extends BaseRegionScanner { indexUpdates.addAll(mutationList); batchSize += mutationList.size(); if (batchSize >= maxBatchSize) { - ungroupedAggregateRegionObserver.checkForRegionClosing(); - indexHTable.batch(indexUpdates); + commitBatch(indexUpdates); batchSize = 0; indexUpdates = new ArrayList<Mutation>(maxBatchSize); } diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java index 4181aca..7f61be3 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/IndexerRegionScanner.java @@ -35,11 +35,8 @@ import java.util.concurrent.Future; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; - -import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; - import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; @@ -65,7 +62,6 @@ import org.apache.phoenix.hbase.index.parallel.ThreadPoolBuilder; import org.apache.phoenix.hbase.index.parallel.ThreadPoolManager; import org.apache.phoenix.hbase.index.parallel.WaitForCompletionTaskRunner; import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; - import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.mapreduce.index.IndexTool; @@ -87,7 +83,6 @@ public class IndexerRegionScanner extends GlobalIndexRegionScanner { protected Map<byte[], Put> indexKeyToDataPutMap; protected UngroupedAggregateRegionObserver.MutationList mutations; private boolean partialRebuild = false; - private UngroupedAggregateRegionObserver ungroupedAggregateRegionObserver; IndexerRegionScanner (final RegionScanner innerScanner, final Region region, final Scan scan, final RegionCoprocessorEnvironment env, diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java index b3cd21b..7b66364 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java @@ -99,6 +99,7 @@ import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; import org.apache.phoenix.hbase.index.util.KeyValueBuilder; import org.apache.phoenix.hbase.index.write.IndexWriter; +import org.apache.phoenix.index.GlobalIndexChecker; import org.apache.phoenix.index.IndexMaintainer; import org.apache.phoenix.index.PhoenixIndexCodec; import org.apache.phoenix.index.PhoenixIndexFailurePolicy; @@ -1072,6 +1073,20 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } } + private RegionScanner getRegionScanner(final RegionScanner innerScanner, final Region region, final Scan scan, + final RegionCoprocessorEnvironment env, final boolean oldCoproc) + throws IOException { + if (oldCoproc) { + return new IndexerRegionScanner(innerScanner, region, scan, env, this); + } else { + if (region.getTableDesc().hasCoprocessor(GlobalIndexChecker.class.getCanonicalName())) { + return new IndexRepairRegionScanner(innerScanner, region, scan, env, this); + } else { + return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this); + } + } + } + private RegionScanner rebuildIndices(final RegionScanner innerScanner, final Region region, final Scan scan, final RegionCoprocessorEnvironment env) throws IOException { boolean oldCoproc = region.getTableDesc().hasCoprocessor(Indexer.class.getCanonicalName()); @@ -1101,25 +1116,9 @@ public class UngroupedAggregateRegionObserver extends BaseScannerRegionObserver } innerScanner.close(); RegionScanner scanner = region.getScanner(rawScan); - if (oldCoproc) { - return new IndexerRegionScanner(scanner, region, scan, env, this); - } else { - if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) { - return new IndexRebuildRegionScanner(scanner, region, scan, env, this); - } else { - return new IndexRepairRegionScanner(scanner, region, scan, env, this); - } - } - } - if (oldCoproc) { - return new IndexerRegionScanner(innerScanner, region, scan, env, this); - } else { - if (region.getTableDesc().hasCoprocessor(IndexRegionObserver.class.getCanonicalName())) { - return new IndexRebuildRegionScanner(innerScanner, region, scan, env, this); - } else { - return new IndexRepairRegionScanner(innerScanner, region, scan, env, this); - } + return getRegionScanner(scanner, region, scan, env, oldCoproc); } + return getRegionScanner(innerScanner, region, scan, env, oldCoproc); } private RegionScanner collectStats(final RegionScanner innerScanner, StatisticsCollector stats,