Repository: hbase Updated Branches: refs/heads/0.98 76756e1c2 -> 7995d9b9f refs/heads/branch-1 baeabeab8 -> 2d83d5545 refs/heads/master 0a46a638d -> 6856e4533
http://git-wip-us.apache.org/repos/asf/hbase/blob/6856e453/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java index 5052f2a..6efe8eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerCoprocessorHost.java @@ -54,170 +54,120 @@ public class RegionServerCoprocessorHost extends } public void preStop(String message) throws IOException { - ObserverContext<RegionServerCoprocessorEnvironment> ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).preStopRegionServer(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { + oserver.preStopRegionServer(ctx); } - // invoke coprocessor stop method - shutdown(env); - } + @Override + public void postEnvCall(RegionServerEnvironment env) { + // invoke coprocessor stop method + shutdown(env); + } + }); } public boolean preMerge(final HRegion regionA, final HRegion regionB) throws IOException { - boolean bypass = false; - ObserverContext<RegionServerCoprocessorEnvironment> ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).preMerge(ctx, regionA, regionB); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { + oserver.preMerge(ctx, regionA, regionB); } - } - return bypass; + }); } public void postMerge(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion) throws IOException { - ObserverContext<RegionServerCoprocessorEnvironment> ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).postMerge(ctx, regionA, regionB, mergedRegion); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { + oserver.postMerge(ctx, regionA, regionB, mergedRegion); } - } + }); } public boolean preMergeCommit(final HRegion regionA, final HRegion regionB, final @MetaMutationAnnotation List<Mutation> metaEntries) throws IOException { - boolean bypass = false; - ObserverContext<RegionServerCoprocessorEnvironment> ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).preMergeCommit(ctx, regionA, regionB, - metaEntries); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { + oserver.preMergeCommit(ctx, regionA, regionB, metaEntries); } - } - return bypass; + }); } public void postMergeCommit(final HRegion regionA, final HRegion regionB, final HRegion mergedRegion) throws IOException { - ObserverContext<RegionServerCoprocessorEnvironment> ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).postMergeCommit(ctx, regionA, regionB, - mergedRegion); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { + oserver.postMergeCommit(ctx, regionA, regionB, mergedRegion); } - } + }); } public void preRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException { - ObserverContext<RegionServerCoprocessorEnvironment> ctx = null; - for (RegionServerEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).preRollBackMerge(ctx, regionA, regionB); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { + oserver.preRollBackMerge(ctx, regionA, regionB); } - } + }); } public void postRollBackMerge(final HRegion regionA, final HRegion regionB) throws IOException { - ObserverContext<RegionServerCoprocessorEnvironment> ctx = null; - for (RegionServerEnvironment env : coprocessors) { + execOperation(coprocessors.isEmpty() ? null : new CoprocessorOperation() { + @Override + public void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException { + oserver.postRollBackMerge(ctx, regionA, regionB); + } + }); + } + + private static abstract class CoprocessorOperation + extends ObserverContext<RegionServerCoprocessorEnvironment> { + public CoprocessorOperation() { + } + + public abstract void call(RegionServerObserver oserver, + ObserverContext<RegionServerCoprocessorEnvironment> ctx) throws IOException; + + public void postEnvCall(RegionServerEnvironment env) { + } + } + + private boolean execOperation(final CoprocessorOperation ctx) throws IOException { + if (ctx == null) return false; + + boolean bypass = false; + for (RegionServerEnvironment env: coprocessors) { if (env.getInstance() instanceof RegionServerObserver) { - ctx = ObserverContext.createAndPrepare(env, ctx); + ctx.prepare(env); Thread currentThread = Thread.currentThread(); ClassLoader cl = currentThread.getContextClassLoader(); try { currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionServerObserver) env.getInstance()).postRollBackMerge(ctx, regionA, regionB); + ctx.call((RegionServerObserver)env.getInstance(), ctx); } catch (Throwable e) { handleCoprocessorThrowable(env, e); } finally { currentThread.setContextClassLoader(cl); } + bypass |= ctx.shouldBypass(); if (ctx.shouldComplete()) { break; } } + ctx.postEnvCall(env); } + return bypass; } /**
