http://git-wip-us.apache.org/repos/asf/hbase/blob/7995d9b9/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java index 6329d47..0ef863d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java @@ -309,83 +309,46 @@ public class RegionCoprocessorHost * @throws IOException Signals that an I/O exception has occurred. */ public void preOpen() throws IOException { - - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preOpen(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preOpen(ctx); } - } - + }); } /** * Invoked after a region open */ public void postOpen() { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postOpen(ctx); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + try { + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postOpen(ctx); } - } + }); + } catch (IOException e) { + LOG.warn(e); } - } /** * Invoked after log replay on region */ public void postLogReplay() { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postLogReplay(ctx); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + try { + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postLogReplay(ctx); } - } + }); + } catch (IOException e) { + LOG.warn(e); } } @@ -394,25 +357,13 @@ public class RegionCoprocessorHost * @param abortRequested true if the server is aborting */ public void preClose(final boolean abortRequested) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preClose(ctx, abortRequested); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); + execOperation(false, new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preClose(ctx, abortRequested); } - } - + }); } /** @@ -420,26 +371,20 @@ public class RegionCoprocessorHost * @param abortRequested true if the server is aborting */ public void postClose(final boolean abortRequested) { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postClose(ctx, abortRequested); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env, e); - } finally { - currentThread.setContextClassLoader(cl); + try { + execOperation(false, new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postClose(ctx, abortRequested); } - env.offerExecutionLatency(System.nanoTime() - startTime); - } - shutdown(env); + public void postEnvCall(RegionEnvironment env) { + shutdown(env); + } + }); + } catch (IOException e) { + LOG.warn(e); } - } /** @@ -449,30 +394,15 @@ public class RegionCoprocessorHost public InternalScanner preCompactScannerOpen(final Store store, final List<StoreFileScanner> scanners, final ScanType scanType, final long earliestPutTs, final CompactionRequest request) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - InternalScanner s = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, - scanners, scanType, earliestPutTs, s, request); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(null, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preCompactScannerOpen(ctx, store, scanners, scanType, + earliestPutTs, getResult(), request)); } - } - return s; + }); } /** @@ -486,31 +416,13 @@ public class RegionCoprocessorHost */ public boolean preCompactSelection(final Store store, final List<StoreFile> candidates, final CompactionRequest request) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - boolean bypass = false; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, - request); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preCompactSelection(ctx, store, candidates, request); } - } - return bypass; + }); } /** @@ -522,29 +434,17 @@ public class RegionCoprocessorHost */ public void postCompactSelection(final Store store, final ImmutableList<StoreFile> selected, final CompactionRequest request) { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, - request); - } catch (Throwable e) { - handleCoprocessorThrowableNoRethrow(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; + try { + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postCompactSelection(ctx, store, selected, request); } - } + }); + } catch (IOException e) { + LOG.warn(e); } - } /** @@ -557,32 +457,14 @@ public class RegionCoprocessorHost */ public InternalScanner preCompact(final Store store, final InternalScanner scanner, final ScanType scanType, final CompactionRequest request) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - boolean bypass = false; - InternalScanner s = scanner; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preCompact(ctx, store, s, scanType, - request); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(false, scanner, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preCompact(ctx, store, getResult(), scanType, request)); } - } - return bypass ? null : s; + }); } /** @@ -594,58 +476,29 @@ public class RegionCoprocessorHost */ public void postCompact(final Store store, final StoreFile resultFile, final CompactionRequest request) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postCompact(ctx, store, resultFile, request); } - } + }); } /** * Invoked before a memstore flush * @throws IOException */ - public InternalScanner preFlush(final Store store, final InternalScanner scanner) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - boolean bypass = false; - InternalScanner s = scanner; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver)env.getInstance()).preFlush(ctx, store, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env,e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + public InternalScanner preFlush(final Store store, final InternalScanner scanner) + throws IOException { + return execOperationWithResult(false, scanner, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preFlush(ctx, store, getResult())); } - } - return bypass ? null : s; + }); } /** @@ -653,27 +506,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preFlush() throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preFlush(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preFlush(ctx); } - } + }); } /** @@ -683,30 +522,14 @@ public class RegionCoprocessorHost */ public InternalScanner preFlushScannerOpen(final Store store, final KeyValueScanner memstoreScanner) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - InternalScanner s = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preFlushScannerOpen(ctx, store, - memstoreScanner, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(null, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<InternalScanner>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preFlushScannerOpen(ctx, store, memstoreScanner, getResult())); } - } - return s; + }); } /** @@ -714,27 +537,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postFlush() throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postFlush(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postFlush(ctx); } - } + }); } /** @@ -742,27 +551,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postFlush(final Store store, final StoreFile storeFile) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postFlush(ctx, store, storeFile); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postFlush(ctx, store, storeFile); } - } + }); } /** @@ -770,28 +565,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preSplit() throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preSplit(ctx); } - } - + }); } /** @@ -799,28 +579,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preSplit(final byte[] splitRow) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preSplit(ctx, splitRow); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preSplit(ctx, splitRow); } - } - + }); } /** @@ -830,79 +595,34 @@ public class RegionCoprocessorHost * @throws IOException */ public void postSplit(final HRegion l, final HRegion r) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postSplit(ctx, l, r); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postSplit(ctx, l, r); } - } + }); } - public boolean preSplitBeforePONR(final byte[] splitKey, + public boolean preSplitBeforePONR(final byte[] splitKey, final List<Mutation> metaEntries) throws IOException { - boolean bypass = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preSplitBeforePONR(ctx, splitKey, metaEntries); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preSplitBeforePONR(ctx, splitKey, metaEntries); } - } - return bypass; + }); } public void preSplitAfterPONR() throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preSplitAfterPONR(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preSplitAfterPONR(ctx); } - } + }); } /** @@ -910,27 +630,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void preRollBackSplit() throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preRollBackSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preRollBackSplit(ctx); } - } + }); } /** @@ -938,27 +644,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postRollBackSplit() throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postRollBackSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postRollBackSplit(ctx); } - } + }); } /** @@ -966,27 +658,13 @@ public class RegionCoprocessorHost * @throws IOException */ public void postCompleteSplit() throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postCompleteSplit(ctx); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postCompleteSplit(ctx); } - } + }); } // RegionObserver support @@ -1000,30 +678,13 @@ public class RegionCoprocessorHost */ public boolean preGetClosestRowBefore(final byte[] row, final byte[] family, final Result result) throws IOException { - boolean bypass = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preGetClosestRowBefore(ctx, row, family, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preGetClosestRowBefore(ctx, row, family, result); } - } - return bypass; + }); } /** @@ -1034,27 +695,13 @@ public class RegionCoprocessorHost */ public void postGetClosestRowBefore(final byte[] row, final byte[] family, final Result result) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postGetClosestRowBefore(ctx, row, family, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postGetClosestRowBefore(ctx, row, family, result); } - } + }); } /** @@ -1064,30 +711,13 @@ public class RegionCoprocessorHost */ public boolean preGet(final Get get, final List<Cell> results) throws IOException { - boolean bypass = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preGetOp(ctx, get, results); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preGetOp(ctx, get, results); } - } - return bypass; + }); } /** @@ -1096,28 +726,14 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public void postGet(final Get get, final List<Cell> results) - throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postGetOp(ctx, get, results); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + throws IOException { + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postGetOp(ctx, get, results); } - } + }); } /** @@ -1127,31 +743,14 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public Boolean preExists(final Get get) throws IOException { - boolean bypass = false; - boolean exists = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - exists = ((RegionObserver)env.getInstance()).preExists(ctx, get, exists); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, false, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preExists(ctx, get, getResult())); } - } - return bypass ? exists : null; + }); } /** @@ -1162,28 +761,14 @@ public class RegionCoprocessorHost */ public boolean postExists(final Get get, boolean exists) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - exists = ((RegionObserver)env.getInstance()).postExists(ctx, get, exists); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(exists, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.postExists(ctx, get, getResult())); } - } - return exists; + }); } /** @@ -1195,30 +780,13 @@ public class RegionCoprocessorHost */ public boolean prePut(final Put put, final WALEdit edit, final Durability durability) throws IOException { - boolean bypass = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).prePut(ctx, put, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.prePut(ctx, put, edit, durability); } - } - return bypass; + }); } /** @@ -1231,34 +799,15 @@ public class RegionCoprocessorHost * @exception IOException * Exception */ - public boolean prePrepareTimeStampForDeleteVersion(Mutation mutation, - Cell kv, byte[] byteNow, Get get) throws IOException { - boolean bypass = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()) - .prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, - byteNow, get); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + public boolean prePrepareTimeStampForDeleteVersion(final Mutation mutation, + final Cell kv, final byte[] byteNow, final Get get) throws IOException { + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.prePrepareTimeStampForDeleteVersion(ctx, mutation, kv, byteNow, get); } - } - return bypass; + }); } /** @@ -1269,27 +818,13 @@ public class RegionCoprocessorHost */ public void postPut(final Put put, final WALEdit edit, final Durability durability) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postPut(ctx, put, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postPut(ctx, put, edit, durability); } - } + }); } /** @@ -1301,30 +836,13 @@ public class RegionCoprocessorHost */ public boolean preDelete(final Delete delete, final WALEdit edit, final Durability durability) throws IOException { - boolean bypass = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preDelete(ctx, delete, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preDelete(ctx, delete, edit, durability); } - } - return bypass; + }); } /** @@ -1335,29 +853,15 @@ public class RegionCoprocessorHost */ public void postDelete(final Delete delete, final WALEdit edit, final Durability durability) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postDelete(ctx, delete, edit, durability); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postDelete(ctx, delete, edit, durability); } - } + }); } - + /** * @param miniBatchOp * @return true if default processing should be bypassed @@ -1365,31 +869,13 @@ public class RegionCoprocessorHost */ public boolean preBatchMutate( final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - boolean bypass = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).preBatchMutate(ctx, miniBatchOp); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preBatchMutate(ctx, miniBatchOp); } - } - - return bypass; + }); } /** @@ -1398,54 +884,25 @@ public class RegionCoprocessorHost */ public void postBatchMutate( final MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postBatchMutate(ctx, miniBatchOp); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postBatchMutate(ctx, miniBatchOp); } - } + }); } public void postBatchMutateIndispensably( final MiniBatchOperationInProgress<Mutation> miniBatchOp, final boolean success) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver) env.getInstance()).postBatchMutateIndispensably(ctx, miniBatchOp, - success); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postBatchMutateIndispensably(ctx, miniBatchOp, success); } - } + }); } /** @@ -1462,33 +919,16 @@ public class RegionCoprocessorHost public Boolean preCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Put put) - throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preCheckAndPut(ctx, row, family, qualifier, - compareOp, comparator, put, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + throws IOException { + return execOperationWithResult(true, false, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preCheckAndPut(ctx, row, family, qualifier, + compareOp, comparator, put, getResult())); } - } - return bypass ? result : null; + }); } /** @@ -1505,32 +945,15 @@ public class RegionCoprocessorHost public Boolean preCheckAndPutAfterRowLock(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Put put) throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preCheckAndPutAfterRowLock(ctx, row, - family, qualifier, compareOp, comparator, put, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, false, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preCheckAndPutAfterRowLock(ctx, row, family, qualifier, + compareOp, comparator, put, getResult())); } - } - return bypass ? result : null; + }); } /** @@ -1545,31 +968,16 @@ public class RegionCoprocessorHost public boolean postCheckAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Put put, - boolean result) - throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).postCheckAndPut(ctx, row, family, - qualifier, compareOp, comparator, put, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + boolean result) throws IOException { + return execOperationWithResult(result, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.postCheckAndPut(ctx, row, family, qualifier, + compareOp, comparator, put, getResult())); } - } - return result; + }); } /** @@ -1587,32 +995,15 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Delete delete) throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preCheckAndDelete(ctx, row, family, - qualifier, compareOp, comparator, delete, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, false, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preCheckAndDelete(ctx, row, family, + qualifier, compareOp, comparator, delete, getResult())); } - } - return bypass ? result : null; + }); } /** @@ -1629,32 +1020,15 @@ public class RegionCoprocessorHost public Boolean preCheckAndDeleteAfterRowLock(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Delete delete) throws IOException { - boolean bypass = false; - boolean result = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preCheckAndDeleteAfterRowLock(ctx, row, - family, qualifier, compareOp, comparator, delete, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, false, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preCheckAndDeleteAfterRowLock(ctx, row, + family, qualifier, compareOp, comparator, delete, getResult())); } - } - return bypass ? result : null; + }); } /** @@ -1670,29 +1044,15 @@ public class RegionCoprocessorHost final byte [] qualifier, final CompareOp compareOp, final ByteArrayComparable comparator, final Delete delete, boolean result) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).postCheckAndDelete(ctx, row, family, - qualifier, compareOp, comparator, delete, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(result, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.postCheckAndDelete(ctx, row, family, + qualifier, compareOp, comparator, delete, getResult())); } - } - return result; + }); } /** @@ -1702,31 +1062,14 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preAppend(final Append append) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preAppend(ctx, append); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, null, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preAppend(ctx, append)); } - } - return bypass ? result : null; + }); } /** @@ -1736,31 +1079,14 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preAppendAfterRowLock(final Append append) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preAppendAfterRowLock(ctx, append); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, null, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preAppendAfterRowLock(ctx, append)); } - } - return bypass ? result : null; + }); } /** @@ -1770,31 +1096,14 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preIncrement(final Increment increment) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).preIncrement(ctx, increment); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, null, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preIncrement(ctx, increment)); } - } - return bypass ? result : null; + }); } /** @@ -1804,31 +1113,14 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result preIncrementAfterRowLock(final Increment increment) throws IOException { - boolean bypass = false; - Result result = null; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver) env.getInstance()).preIncrementAfterRowLock(ctx, increment); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, null, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preIncrementAfterRowLock(ctx, increment)); } - } - return bypass ? result : null; + }); } /** @@ -1837,27 +1129,13 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public void postAppend(final Append append, final Result result) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postAppend(ctx, append, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postAppend(ctx, append, result); } - } + }); } /** @@ -1866,28 +1144,14 @@ public class RegionCoprocessorHost * @throws IOException if an error occurred on the coprocessor */ public Result postIncrement(final Increment increment, Result result) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - result = ((RegionObserver)env.getInstance()).postIncrement(ctx, increment, result); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(result, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Result>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.postIncrement(ctx, increment, getResult())); } - } - return result; + }); } /** @@ -1897,31 +1161,14 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public RegionScanner preScannerOpen(final Scan scan) throws IOException { - boolean bypass = false; - RegionScanner s = null; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver)env.getInstance()).preScannerOpen(ctx, scan, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, null, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preScannerOpen(ctx, scan, getResult())); } - } - return bypass ? s : null; + }); } /** @@ -1931,30 +1178,14 @@ public class RegionCoprocessorHost */ public KeyValueScanner preStoreScannerOpen(final Store store, final Scan scan, final NavigableSet<byte[]> targetCols) throws IOException { - KeyValueScanner s = null; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver) env.getInstance()).preStoreScannerOpen(ctx, store, scan, - targetCols, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(null, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<KeyValueScanner>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preStoreScannerOpen(ctx, store, scan, targetCols, getResult())); } - } - return s; + }); } /** @@ -1964,28 +1195,14 @@ public class RegionCoprocessorHost * @exception IOException Exception */ public RegionScanner postScannerOpen(final Scan scan, RegionScanner s) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - s = ((RegionObserver)env.getInstance()).postScannerOpen(ctx, scan, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(s, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<RegionScanner>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.postScannerOpen(ctx, scan, getResult())); } - } - return s; + }); } /** @@ -1998,32 +1215,14 @@ public class RegionCoprocessorHost */ public Boolean preScannerNext(final InternalScanner s, final List<Result> results, final int limit) throws IOException { - boolean bypass = false; - boolean hasNext = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasNext = ((RegionObserver)env.getInstance()).preScannerNext(ctx, s, results, limit, - hasNext); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, false, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.preScannerNext(ctx, s, results, limit, getResult())); } - } - - return bypass ? hasNext : null; + }); } /** @@ -2037,29 +1236,14 @@ public class RegionCoprocessorHost public boolean postScannerNext(final InternalScanner s, final List<Result> results, final int limit, boolean hasMore) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasMore = ((RegionObserver)env.getInstance()).postScannerNext(ctx, s, results, limit, - hasMore); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(hasMore, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.postScannerNext(ctx, s, results, limit, getResult())); } - } - return hasMore; + }); } /** @@ -2074,90 +1258,42 @@ public class RegionCoprocessorHost */ public boolean postScannerFilterRow(final InternalScanner s, final byte[] currentRow, final int offset, final short length) throws IOException { - boolean hasMore = true; // By default assume more rows there. - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env : coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - hasMore = ((RegionObserver) env.getInstance()).postScannerFilterRow(ctx, s, currentRow, - offset, length, hasMore); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperationWithResult(true, + coprocessors.isEmpty() ? null : new RegionOperationWithResult<Boolean>() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + setResult(oserver.postScannerFilterRow(ctx, s, currentRow, offset,length, getResult())); } - } - return hasMore; + }); } - + /** * @param s the scanner * @return true if default behavior should be bypassed, false otherwise * @exception IOException Exception */ public boolean preScannerClose(final InternalScanner s) throws IOException { - boolean bypass = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preScannerClose(ctx, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preScannerClose(ctx, s); } - } - - return bypass; + }); } /** * @exception IOException Exception */ public void postScannerClose(final InternalScanner s) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postScannerClose(ctx, s); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postScannerClose(ctx, s); } - } + }); } /** @@ -2169,30 +1305,13 @@ public class RegionCoprocessorHost */ public boolean preWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit) throws IOException { - boolean bypass = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preWALRestore(ctx, info, logKey, logEdit); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preWALRestore(ctx, info, logKey, logEdit); } - } - return bypass; + }); } /** @@ -2203,27 +1322,13 @@ public class RegionCoprocessorHost */ public void postWALRestore(final HRegionInfo info, final HLogKey logKey, final WALEdit logEdit) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).postWALRestore(ctx, info, logKey, logEdit); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.postWALRestore(ctx, info, logKey, logEdit); } - } + }); } /** @@ -2232,30 +1337,13 @@ public class RegionCoprocessorHost * @throws IOException */ public boolean preBulkLoadHFile(final List<Pair<byte[], String>> familyPaths) throws IOException { - boolean bypass = false; - ObserverContext<RegionCoprocessorEnvironment> ctx = null; - for (RegionEnvironment env: coprocessors) { - if (env.getInstance() instanceof RegionObserver) { - long startTime = System.nanoTime(); - ctx = ObserverContext.createAndPrepare(env, ctx); - Thread currentThread = Thread.currentThread(); - ClassLoader cl = currentThread.getContextClassLoader(); - try { - currentThread.setContextClassLoader(env.getClassLoader()); - ((RegionObserver)env.getInstance()).preBulkLoadHFile(ctx, familyPaths); - } catch (Throwable e) { - handleCoprocessorThrowable(env, e); - } finally { - currentThread.setContextClassLoader(cl); - } - bypass |= ctx.shouldBypass(); - env.offerExecutionLatency(System.nanoTime() - startTime); - if (ctx.shouldComplete()) { - break; - } + return execOperation(coprocessors.isEmpty() ? null : new RegionOperation() { + @Override + public void call(RegionObserver oserver, ObserverContext<RegionCoprocessorEnvironment> ctx) + throws IOException { + oserver.preBulkLoadHFile(ctx, familyPaths); } - } - return bypass; + }); } /** @@ -2266,77 +1354,34 @@ public class RegionCoprocessorHost */ public boolean postBulkLoadHFile(final List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException { - ObserverContext<RegionCoprocessorEnvironment> ctx =
<TRUNCATED>
