This is an automated email from the ASF dual-hosted git repository. stack pushed a commit to branch branch-2 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 58b0e0f3fcfd7ca53fd12692b588ce1b3fe61057 Author: stack <[email protected]> AuthorDate: Thu Jan 9 15:23:36 2020 -0800 Revert "HBASE-23601: OutputSink.WriterThread exception gets stuck and repeated indefinietly (#956)" This reverts commit e78ce468d8e37df49151c16c39a9607258c3c096. --- .../RegionReplicaReplicationEndpoint.java | 1 - .../org/apache/hadoop/hbase/wal/OutputSink.java | 20 +--- .../org/apache/hadoop/hbase/wal/WALSplitter.java | 1 - .../hadoop/hbase/wal/TestOutputSinkWriter.java | 126 --------------------- 4 files changed, 1 insertion(+), 147 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index cead808..60f693a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -249,7 +249,6 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint { } catch (IOException e) { LOG.warn("Received IOException while trying to replicate" + StringUtils.stringifyException(e)); - outputSink.restartWriterThreadsIfNeeded(); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java index de62c4d..b589134 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java @@ -89,19 +89,6 @@ public abstract class OutputSink { } } - public synchronized void restartWriterThreadsIfNeeded() { - for(int i = 0; i< writerThreads.size(); i++){ - WriterThread t = writerThreads.get(i); - if (!t.isAlive()){ - String threadName = t.getName(); - LOG.debug("Replacing dead thread: " + threadName); - WriterThread newThread = new WriterThread(controller, entryBuffers, this, threadName); - newThread.start(); - writerThreads.set(i, newThread); - } - } - } - /** * Wait for writer threads to dump all info to the sink * @@ -177,12 +164,7 @@ public abstract class OutputSink { WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i) { - this(controller, entryBuffers, sink, Thread.currentThread().getName() + "-Writer-" + i); - } - - WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, - OutputSink sink, String threadName) { - super(threadName); + super(Thread.currentThread().getName() + "-Writer-" + i); this.controller = controller; this.entryBuffers = entryBuffers; outputSink = sink; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 6b75f1d..d7bbd07 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -475,7 +475,6 @@ public class WALSplitter { if (thrown == null) { return; } - this.thrown.set(null); if (thrown instanceof IOException) { throw new IOException(thrown); } else { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java deleted file mode 100644 index 5249835..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestOutputSinkWriter.java +++ /dev/null @@ -1,126 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.wal; - -import java.io.IOException; -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.testclassification.MediumTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category({ RegionServerTests.class, MediumTests.class }) -public class TestOutputSinkWriter { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass( - TestOutputSinkWriter.class); - - @Test - public void testExeptionHandling() throws IOException, InterruptedException { - WALSplitter.PipelineController controller = new WALSplitter.PipelineController(); - BrokenEntryBuffers entryBuffers = new BrokenEntryBuffers(controller, 2000); - OutputSink sink = new OutputSink(controller, entryBuffers, 1) { - - @Override public List<Path> finishWritingAndClose() throws IOException { - return null; - } - - @Override public Map<byte[],Long> getOutputCounts() { - return null; - } - - @Override public int getNumberOfRecoveredRegions() { - return 0; - } - - @Override public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException { - - } - - @Override public boolean keepRegionEvent(WAL.Entry entry) { - return false; - } - }; - - //start the Writer thread and give it time trow the exception - sink.startWriterThreads(); - Thread.sleep(1000L); - - //make sure the exception is stored - try { - controller.checkForErrors(); - Assert.fail(); - } - catch (RuntimeException re){ - Assert.assertTrue(true); - } - - sink.restartWriterThreadsIfNeeded(); - - //after the check the stored exception should be gone - try { - controller.checkForErrors(); - } - catch (RuntimeException re){ - Assert.fail(); - } - - //prep another exception and wait for it to be thrown - entryBuffers.setThrowError(true); - Thread.sleep(1000L); - - //make sure the exception is stored - try { - controller.checkForErrors(); - Assert.fail(); - } - catch (RuntimeException re){ - Assert.assertTrue(true); - } - } - - static class BrokenEntryBuffers extends EntryBuffers{ - boolean throwError = true; - - public BrokenEntryBuffers(WALSplitter.PipelineController controller, long maxHeapUsage) { - super(controller, maxHeapUsage); - } - - @Override - synchronized WALSplitter.RegionEntryBuffer getChunkToWrite() { - //This just emulates something going wrong with in the Writer - if(throwError){ - throwError = false; - throw new RuntimeException("testing"); - } - return null; - } - - public void setThrowError(boolean newValue){ - throwError = newValue; - } - }; -}
