Repository: phoenix Updated Branches: refs/heads/4.8-HBase-0.98 d08ff399f -> f754ea525 refs/heads/4.8-HBase-1.0 c32d463f0 -> 9489ec5ea refs/heads/4.8-HBase-1.1 25a5550c9 -> b8540e344 refs/heads/4.8-HBase-1.2 03e100549 -> ce3533deb refs/heads/4.x-HBase-0.98 fb976aa56 -> bbb3e9119 refs/heads/4.x-HBase-1.1 b70d8f59f -> 6d1f47438 refs/heads/master fd6da35ee -> 36d500cb8
PHOENIX-3081 Consult RegionServer stopped/stopping state before logging error in StatisticsScanner Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/36d500cb Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/36d500cb Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/36d500cb Branch: refs/heads/master Commit: 36d500cb83209cd10ae87c5cce6648aab2866925 Parents: fd6da35 Author: Josh Elser <[email protected]> Authored: Mon Jul 18 16:24:22 2016 -0400 Committer: Josh Elser <[email protected]> Committed: Thu Sep 8 17:27:07 2016 -0400 ---------------------------------------------------------------------- .../phoenix/schema/stats/StatisticsScanner.java | 72 ++++++++-- .../schema/stats/StatisticsScannerTest.java | 144 +++++++++++++++++++ 2 files changed, 202 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/36d500cb/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index 082e833..736efc6 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.hadoop.hbase.regionserver.ScannerContext; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -49,12 +50,14 @@ public class StatisticsScanner implements InternalScanner { private StatisticsCollector tracker; private ImmutableBytesPtr family; private final Configuration config; + private final RegionServerServices regionServerServices; public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter stats, RegionCoprocessorEnvironment env, InternalScanner delegate, ImmutableBytesPtr family) { this.tracker = tracker; this.statsWriter = stats; this.delegate = delegate; + this.regionServerServices = env.getRegionServerServices(); this.region = env.getRegion(); this.family = family; this.config = env.getConfiguration(); @@ -89,9 +92,13 @@ public class StatisticsScanner implements InternalScanner { @Override public void close() throws IOException { - boolean async = config.getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC); - StatisticsCollectionRunTracker collectionTracker = StatisticsCollectionRunTracker.getInstance(config); - StatisticsScannerCallable callable = new StatisticsScannerCallable(); + boolean async = getConfig().getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC); + StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config); + StatisticsScannerCallable callable = createCallable(); + if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) { + LOG.debug("Not updating table statistics because the server is stopping/stopped"); + return; + } if (!async) { callable.call(); } else { @@ -99,12 +106,45 @@ public class StatisticsScanner implements InternalScanner { } } - private class StatisticsScannerCallable implements Callable<Void> { + // VisibleForTesting + StatisticsCollectionRunTracker getStatsCollectionRunTracker(Configuration c) { + return StatisticsCollectionRunTracker.getInstance(c); + } + + Configuration getConfig() { + return config; + } + + StatisticsWriter getStatisticsWriter() { + return statsWriter; + } + + RegionServerServices getRegionServerServices() { + return regionServerServices; + } + + Region getRegion() { + return region; + } + + StatisticsScannerCallable createCallable() { + return new StatisticsScannerCallable(); + } + + StatisticsCollector getTracker() { + return tracker; + } + + InternalScanner getDelegate() { + return delegate; + } + + class StatisticsScannerCallable implements Callable<Void> { @Override public Void call() throws IOException { IOException toThrow = null; - StatisticsCollectionRunTracker collectionTracker = StatisticsCollectionRunTracker.getInstance(config); - final HRegionInfo regionInfo = region.getRegionInfo(); + StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config); + final HRegionInfo regionInfo = getRegion().getRegionInfo(); try { // update the statistics table // Just verify if this if fine @@ -114,32 +154,36 @@ public class StatisticsScanner implements InternalScanner { LOG.debug("Deleting the stats for the region " + regionInfo.getRegionNameAsString() + " as part of major compaction"); } - statsWriter.deleteStats(region, tracker, family, mutations); + getStatisticsWriter().deleteStats(region, tracker, family, mutations); if (LOG.isDebugEnabled()) { LOG.debug("Adding new stats for the region " + regionInfo.getRegionNameAsString() + " as part of major compaction"); } - statsWriter.addStats(tracker, family, mutations); + getStatisticsWriter().addStats(tracker, family, mutations); if (LOG.isDebugEnabled()) { LOG.debug("Committing new stats for the region " + regionInfo.getRegionNameAsString() + " as part of major compaction"); } - statsWriter.commitStats(mutations, tracker); + getStatisticsWriter().commitStats(mutations, tracker); } catch (IOException e) { - LOG.error("Failed to update statistics table!", e); - toThrow = e; + if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) { + LOG.debug("Ignoring error updating statistics because region is closing/closed"); + } else { + LOG.error("Failed to update statistics table!", e); + toThrow = e; + } } finally { try { collectionTracker.removeCompactingRegion(regionInfo); - statsWriter.close();// close the writer - tracker.close();// close the tracker + getStatisticsWriter().close();// close the writer + getTracker().close();// close the tracker } catch (IOException e) { if (toThrow == null) toThrow = e; LOG.error("Error while closing the stats table", e); } finally { // close the delegate scanner try { - delegate.close(); + getDelegate().close(); } catch (IOException e) { if (toThrow == null) toThrow = e; LOG.error("Error while closing the scanner", e); http://git-wip-us.apache.org/repos/asf/phoenix/blob/36d500cb/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java new file mode 100644 index 0000000..888f09a --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stats; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.schema.stats.StatisticsScanner.StatisticsScannerCallable; +import org.junit.Before; +import org.junit.Test; + +/** + * Test to verify that we don't try to update stats when a RS is stopping. + */ +public class StatisticsScannerTest { + + private Region region; + private RegionServerServices rsServices; + private StatisticsWriter statsWriter; + private StatisticsScannerCallable callable; + private StatisticsCollectionRunTracker runTracker; + private StatisticsScanner mockScanner; + private StatisticsCollector tracker; + private InternalScanner delegate; + private HRegionInfo regionInfo; + + private Configuration config; + + @Before + public void setupMocks() throws Exception { + this.config = new Configuration(false); + + // Create all of the mocks + this.region = mock(Region.class); + this.rsServices = mock(RegionServerServices.class); + this.statsWriter = mock(StatisticsWriter.class); + this.callable = mock(StatisticsScannerCallable.class); + this.runTracker = mock(StatisticsCollectionRunTracker.class); + this.mockScanner = mock(StatisticsScanner.class); + this.tracker = mock(StatisticsCollector.class); + this.delegate = mock(InternalScanner.class); + this.regionInfo = mock(HRegionInfo.class); + + // Wire up the mocks to the mock StatisticsScanner + when(mockScanner.getStatisticsWriter()).thenReturn(statsWriter); + when(mockScanner.getRegionServerServices()).thenReturn(rsServices); + when(mockScanner.createCallable()).thenReturn(callable); + when(mockScanner.getStatsCollectionRunTracker(any(Configuration.class))).thenReturn(runTracker); + when(mockScanner.getRegion()).thenReturn(region); + when(mockScanner.getConfig()).thenReturn(config); + when(mockScanner.getTracker()).thenReturn(tracker); + when(mockScanner.getDelegate()).thenReturn(delegate); + + // Wire up the HRegionInfo mock to the Region mock + when(region.getRegionInfo()).thenReturn(regionInfo); + + // Always call close() on the mock StatisticsScanner + doCallRealMethod().when(mockScanner).close(); + } + + @Test + public void testCheckRegionServerStoppingOnClose() throws Exception { + when(rsServices.isStopping()).thenReturn(true); + when(rsServices.isStopped()).thenReturn(false); + + mockScanner.close(); + + verify(rsServices).isStopping(); + verify(callable, never()).call(); + verify(runTracker, never()).runTask(callable); + } + + @Test + public void testCheckRegionServerStoppedOnClose() throws Exception { + when(rsServices.isStopping()).thenReturn(false); + when(rsServices.isStopped()).thenReturn(true); + + mockScanner.close(); + + verify(rsServices).isStopping(); + verify(rsServices).isStopped(); + verify(callable, never()).call(); + verify(runTracker, never()).runTask(callable); + } + + @SuppressWarnings("unchecked") + @Test + public void testCheckRegionServerStoppingOnException() throws Exception { + StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable(); + doThrow(new IOException()).when(statsWriter).deleteStats(any(Region.class), any(StatisticsCollector.class), + any(ImmutableBytesPtr.class), any(List.class)); + when(rsServices.isStopping()).thenReturn(true); + when(rsServices.isStopped()).thenReturn(false); + + // Should not throw an exception + realCallable.call(); + + verify(rsServices).isStopping(); + } + + @SuppressWarnings("unchecked") + @Test + public void testCheckRegionServerStoppedOnException() throws Exception { + StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable(); + doThrow(new IOException()).when(statsWriter).deleteStats(any(Region.class), any(StatisticsCollector.class), + any(ImmutableBytesPtr.class), any(List.class)); + when(rsServices.isStopping()).thenReturn(false); + when(rsServices.isStopped()).thenReturn(true); + + // Should not throw an exception + realCallable.call(); + + verify(rsServices).isStopping(); + verify(rsServices).isStopped(); + } +}
