PHOENIX-3081 Consult RegionServer stopped/stopping state before logging error in StatisticsScanner
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/f754ea52 Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/f754ea52 Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/f754ea52 Branch: refs/heads/4.8-HBase-0.98 Commit: f754ea52551803c0ccb04d5fb71c575f79de7a92 Parents: d08ff39 Author: Josh Elser <[email protected]> Authored: Mon Jul 18 16:24:22 2016 -0400 Committer: Josh Elser <[email protected]> Committed: Thu Sep 8 18:31:22 2016 -0400 ---------------------------------------------------------------------- .../phoenix/schema/stats/StatisticsScanner.java | 91 +++++++++--- .../schema/stats/StatisticsScannerTest.java | 144 +++++++++++++++++++ 2 files changed, 212 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/f754ea52/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java index a9ce275..71be072 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/schema/stats/StatisticsScanner.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; @@ -48,12 +49,14 @@ public class StatisticsScanner implements InternalScanner { private StatisticsCollector tracker; private ImmutableBytesPtr family; private final Configuration config; + private final RegionServerServices regionServerServices; public StatisticsScanner(StatisticsCollector tracker, StatisticsWriter statsWriter, RegionCoprocessorEnvironment env, InternalScanner delegate, ImmutableBytesPtr family) { this.tracker = tracker; this.statsWriter = statsWriter; this.delegate = delegate; + this.regionServerServices = env.getRegionServerServices(); this.region = env.getRegion(); this.config = env.getConfiguration(); this.family = family; @@ -86,12 +89,61 @@ public class StatisticsScanner implements InternalScanner { } } - private class StatisticsScannerCallable implements Callable<Void> { + @Override + public void close() throws IOException { + boolean async = getConfig().getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC); + StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config); + StatisticsScannerCallable callable = createCallable(); + if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) { + LOG.debug("Not updating table statistics because the server is stopping/stopped"); + return; + } + if (!async) { + callable.call(); + } else { + collectionTracker.runTask(callable); + } + } + + // VisibleForTesting + StatisticsCollectionRunTracker getStatsCollectionRunTracker(Configuration c) { + return StatisticsCollectionRunTracker.getInstance(c); + } + + Configuration getConfig() { + return config; + } + + StatisticsWriter getStatisticsWriter() { + return statsWriter; + } + + RegionServerServices getRegionServerServices() { + return regionServerServices; + } + + HRegion getRegion() { + return region; + } + + StatisticsScannerCallable createCallable() { + return new StatisticsScannerCallable(); + } + + StatisticsCollector getTracker() { + return tracker; + } + + InternalScanner getDelegate() { + return delegate; + } + + class StatisticsScannerCallable implements Callable<Void> { @Override public Void call() throws IOException { IOException toThrow = null; - StatisticsCollectionRunTracker statsRunState = - StatisticsCollectionRunTracker.getInstance(config); + StatisticsCollectionRunTracker collectionTracker = getStatsCollectionRunTracker(config); + final HRegion region = getRegion(); try { // update the statistics table // Just verify if this if fine @@ -100,32 +152,36 @@ public class StatisticsScanner implements InternalScanner { LOG.debug("Deleting the stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); } - statsWriter.deleteStats(region, tracker, family, mutations); + getStatisticsWriter().deleteStats(region, tracker, family, mutations); if (LOG.isDebugEnabled()) { LOG.debug("Adding new stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); } - statsWriter.addStats(tracker, family, mutations); + getStatisticsWriter().addStats(tracker, family, mutations); if (LOG.isDebugEnabled()) { LOG.debug("Committing new stats for the region " + region.getRegionNameAsString() + " as part of major compaction"); } - statsWriter.commitStats(mutations, tracker); + getStatisticsWriter().commitStats(mutations, tracker); } catch (IOException e) { - LOG.error("Failed to update statistics table!", e); - toThrow = e; + if (getRegionServerServices().isStopping() || getRegionServerServices().isStopped()) { + LOG.debug("Ignoring error updating statistics because region is closing/closed"); + } else { + LOG.error("Failed to update statistics table!", e); + toThrow = e; + } } finally { try { - statsRunState.removeCompactingRegion(region.getRegionInfo()); - statsWriter.close(); - tracker.close();// close the tracker + collectionTracker.removeCompactingRegion(region.getRegionInfo()); + getStatisticsWriter().close();// close the writer + getTracker().close();// close the tracker } catch (IOException e) { if (toThrow == null) toThrow = e; LOG.error("Error while closing the stats table", e); } finally { // close the delegate scanner try { - delegate.close(); + getDelegate().close(); } catch (IOException e) { if (toThrow == null) toThrow = e; LOG.error("Error while closing the scanner", e); @@ -137,15 +193,4 @@ public class StatisticsScanner implements InternalScanner { return null; } } - - @Override - public void close() throws IOException { - boolean async = config.getBoolean(COMMIT_STATS_ASYNC, DEFAULT_COMMIT_STATS_ASYNC); - StatisticsScannerCallable callable = new StatisticsScannerCallable(); - if (!async) { - callable.call(); - } else { - StatisticsCollectionRunTracker.getInstance(config).runTask(callable); - } - } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/f754ea52/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java new file mode 100644 index 0000000..685b4b6 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/schema/stats/StatisticsScannerTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to you under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.schema.stats; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr; +import org.apache.phoenix.schema.stats.StatisticsScanner.StatisticsScannerCallable; +import org.junit.Before; +import org.junit.Test; + +/** + * Test to verify that we don't try to update stats when a RS is stopping. + */ +public class StatisticsScannerTest { + + private HRegion region; + private RegionServerServices rsServices; + private StatisticsWriter statsWriter; + private StatisticsScannerCallable callable; + private StatisticsCollectionRunTracker runTracker; + private StatisticsScanner mockScanner; + private StatisticsCollector tracker; + private InternalScanner delegate; + private HRegionInfo regionInfo; + + private Configuration config; + + @Before + public void setupMocks() throws Exception { + this.config = new Configuration(false); + + // Create all of the mocks + this.region = mock(HRegion.class); + this.rsServices = mock(RegionServerServices.class); + this.statsWriter = mock(StatisticsWriter.class); + this.callable = mock(StatisticsScannerCallable.class); + this.runTracker = mock(StatisticsCollectionRunTracker.class); + this.mockScanner = mock(StatisticsScanner.class); + this.tracker = mock(StatisticsCollector.class); + this.delegate = mock(InternalScanner.class); + this.regionInfo = mock(HRegionInfo.class); + + // Wire up the mocks to the mock StatisticsScanner + when(mockScanner.getStatisticsWriter()).thenReturn(statsWriter); + when(mockScanner.getRegionServerServices()).thenReturn(rsServices); + when(mockScanner.createCallable()).thenReturn(callable); + when(mockScanner.getStatsCollectionRunTracker(any(Configuration.class))).thenReturn(runTracker); + when(mockScanner.getRegion()).thenReturn(region); + when(mockScanner.getConfig()).thenReturn(config); + when(mockScanner.getTracker()).thenReturn(tracker); + when(mockScanner.getDelegate()).thenReturn(delegate); + + // Wire up the HRegionInfo mock to the Region mock + when(region.getRegionInfo()).thenReturn(regionInfo); + + // Always call close() on the mock StatisticsScanner + doCallRealMethod().when(mockScanner).close(); + } + + @Test + public void testCheckRegionServerStoppingOnClose() throws Exception { + when(rsServices.isStopping()).thenReturn(true); + when(rsServices.isStopped()).thenReturn(false); + + mockScanner.close(); + + verify(rsServices).isStopping(); + verify(callable, never()).call(); + verify(runTracker, never()).runTask(callable); + } + + @Test + public void testCheckRegionServerStoppedOnClose() throws Exception { + when(rsServices.isStopping()).thenReturn(false); + when(rsServices.isStopped()).thenReturn(true); + + mockScanner.close(); + + verify(rsServices).isStopping(); + verify(rsServices).isStopped(); + verify(callable, never()).call(); + verify(runTracker, never()).runTask(callable); + } + + @SuppressWarnings("unchecked") + @Test + public void testCheckRegionServerStoppingOnException() throws Exception { + StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable(); + doThrow(new IOException()).when(statsWriter).deleteStats(any(HRegion.class), any(StatisticsCollector.class), + any(ImmutableBytesPtr.class), any(List.class)); + when(rsServices.isStopping()).thenReturn(true); + when(rsServices.isStopped()).thenReturn(false); + + // Should not throw an exception + realCallable.call(); + + verify(rsServices).isStopping(); + } + + @SuppressWarnings("unchecked") + @Test + public void testCheckRegionServerStoppedOnException() throws Exception { + StatisticsScannerCallable realCallable = mockScanner.new StatisticsScannerCallable(); + doThrow(new IOException()).when(statsWriter).deleteStats(any(HRegion.class), any(StatisticsCollector.class), + any(ImmutableBytesPtr.class), any(List.class)); + when(rsServices.isStopping()).thenReturn(false); + when(rsServices.isStopped()).thenReturn(true); + + // Should not throw an exception + realCallable.call(); + + verify(rsServices).isStopping(); + verify(rsServices).isStopped(); + } +}
