Author: stack
Date: Mon Apr 25 19:21:17 2011
New Revision: 1096568
URL: http://svn.apache.org/viewvc?rev=1096568&view=rev
Log:
HBASE-3812 Tidy up naming consistency and documentation in coprocessor framework
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
Removed:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserverCoprocessor.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALCoprocessors.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
Modified: hbase/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1096568&r1=1096567&r2=1096568&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Mon Apr 25 19:21:17 2011
@@ -183,6 +183,8 @@ Release 0.91.0 - Unreleased
HBASE-3609 Improve the selection of regions to balance; part 2 (Ted Yu)
HBASE-2939 Allow Client-Side Connection Pooling (Karthik Sankarachary)
HBASE-3798 [REST] Allow representation to elide row key and column key
+ HBASE-3812 Tidy up naming consistency and documentation in coprocessor
+ framework (Mingjie Lai)
TASKS
HBASE-3559 Move report of split to master OFF the heartbeat channel
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java?rev=1096568&r1=1096567&r2=1096568&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseMasterObserver.java
Mon Apr 25 19:21:17 2011
@@ -30,153 +30,153 @@ import java.io.IOException;
public class BaseMasterObserver implements MasterObserver {
@Override
- public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void preCreateTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
HTableDescriptor desc, byte[][] splitKeys) throws IOException {
}
@Override
- public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment>
ctx,
HRegionInfo[] regions, boolean sync) throws IOException {
}
@Override
- public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void preDeleteTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName) throws IOException {
}
@Override
- public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void postDeleteTable(ObserverContext<MasterCoprocessorEnvironment>
ctx,
byte[] tableName) throws IOException {
}
@Override
- public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void preModifyTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName, HTableDescriptor htd) throws IOException {
}
@Override
- public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void postModifyTable(ObserverContext<MasterCoprocessorEnvironment>
ctx,
byte[] tableName, HTableDescriptor htd) throws IOException {
}
@Override
- public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void preAddColumn(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName, HColumnDescriptor column) throws IOException {
}
@Override
- public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void postAddColumn(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName, HColumnDescriptor column) throws IOException {
}
@Override
- public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void preModifyColumn(ObserverContext<MasterCoprocessorEnvironment>
ctx,
byte[] tableName, HColumnDescriptor descriptor) throws IOException {
}
@Override
- public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void postModifyColumn(ObserverContext<MasterCoprocessorEnvironment>
ctx,
byte[] tableName, HColumnDescriptor descriptor) throws IOException {
}
@Override
- public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void preDeleteColumn(ObserverContext<MasterCoprocessorEnvironment>
ctx,
byte[] tableName, byte[] c) throws IOException {
}
@Override
- public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void postDeleteColumn(ObserverContext<MasterCoprocessorEnvironment>
ctx,
byte[] tableName, byte[] c) throws IOException {
}
@Override
- public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void preEnableTable(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] tableName) throws IOException {
}
@Override
- public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void postEnableTable(ObserverContext<MasterCoprocessorEnvironment>
ctx,
byte[] tableName) throws IOException {
}
@Override
- public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void preDisableTable(ObserverContext<MasterCoprocessorEnvironment>
ctx,
byte[] tableName) throws IOException {
}
@Override
- public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void postDisableTable(ObserverContext<MasterCoprocessorEnvironment>
ctx,
byte[] tableName) throws IOException {
}
@Override
- public void preMove(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void preMove(ObserverContext<MasterCoprocessorEnvironment> ctx,
HRegionInfo region, HServerInfo srcServer, HServerInfo destServer)
throws UnknownRegionException {
}
@Override
- public void postMove(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void postMove(ObserverContext<MasterCoprocessorEnvironment> ctx,
HRegionInfo region, HServerInfo srcServer, HServerInfo destServer)
throws UnknownRegionException {
}
@Override
- public void preAssign(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void preAssign(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] regionName, boolean force) throws IOException {
}
@Override
- public void postAssign(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void postAssign(ObserverContext<MasterCoprocessorEnvironment> ctx,
HRegionInfo regionInfo) throws IOException {
}
@Override
- public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void preUnassign(ObserverContext<MasterCoprocessorEnvironment> ctx,
byte[] regionName, boolean force) throws IOException {
}
@Override
- public void postUnassign(ObserverContext<MasterCoprocessorEnvironment> env,
+ public void postUnassign(ObserverContext<MasterCoprocessorEnvironment> ctx,
HRegionInfo regionInfo, boolean force) throws IOException {
}
@Override
- public void preBalance(ObserverContext<MasterCoprocessorEnvironment> env)
+ public void preBalance(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
@Override
- public void postBalance(ObserverContext<MasterCoprocessorEnvironment> env)
+ public void postBalance(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
@Override
- public boolean
preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> env,
+ public boolean
preBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment> ctx,
boolean b) throws IOException {
return b;
}
@Override
- public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment>
env,
+ public void postBalanceSwitch(ObserverContext<MasterCoprocessorEnvironment>
ctx,
boolean oldValue, boolean newValue) throws IOException {
}
@Override
- public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> env)
+ public void preShutdown(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
@Override
- public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> env)
+ public void preStopMaster(ObserverContext<MasterCoprocessorEnvironment> ctx)
throws IOException {
}
@Override
- public void start(CoprocessorEnvironment env) throws IOException {
+ public void start(CoprocessorEnvironment ctx) throws IOException {
}
@Override
- public void stop(CoprocessorEnvironment env) throws IOException {
+ public void stop(CoprocessorEnvironment ctx) throws IOException {
}
}
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1096568&view=auto
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
(added)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
Mon Apr 25 19:21:17 2011
@@ -0,0 +1,243 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
+import org.apache.hadoop.hbase.filter.WritableByteArrayComparable;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+
+import java.io.IOException;
+
+/**
+ * An abstract class that implements RegionObserver.
+ * By extending it, you can create your own region observer without
+ * overriding all abstract methods of RegionObserver.
+ */
+public abstract class BaseRegionObserver implements RegionObserver {
+ @Override
+ public void start(CoprocessorEnvironment e) { }
+
+ @Override
+ public void stop(CoprocessorEnvironment e) { }
+
+ @Override
+ public void preOpen(ObserverContext<RegionCoprocessorEnvironment> e) { }
+
+ @Override
+ public void postOpen(ObserverContext<RegionCoprocessorEnvironment> e) { }
+
+ @Override
+ public void preClose(ObserverContext<RegionCoprocessorEnvironment> e,
+ boolean abortRequested) { }
+
+ @Override
+ public void postClose(ObserverContext<RegionCoprocessorEnvironment> e,
+ boolean abortRequested) { }
+
+ @Override
+ public void preFlush(ObserverContext<RegionCoprocessorEnvironment> e) { }
+
+ @Override
+ public void postFlush(ObserverContext<RegionCoprocessorEnvironment> e) { }
+
+ @Override
+ public void preSplit(ObserverContext<RegionCoprocessorEnvironment> e) { }
+
+ @Override
+ public void postSplit(ObserverContext<RegionCoprocessorEnvironment> e,
+ HRegion l, HRegion r) { }
+
+ @Override
+ public void preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
+ boolean willSplit) { }
+
+ @Override
+ public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e,
+ boolean willSplit) { }
+
+ @Override
+ public void preGetClosestRowBefore(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final byte [] row, final byte [] family, final Result result)
+ throws IOException {
+ }
+
+ @Override
+ public void postGetClosestRowBefore(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final byte [] row, final byte [] family, final Result result)
+ throws IOException {
+ }
+
+ @Override
+ public void preGet(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Get get, final List<KeyValue> results) throws IOException {
+ }
+
+ @Override
+ public void postGet(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Get get, final List<KeyValue> results) throws IOException {
+ }
+
+ @Override
+ public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment>
e,
+ final Get get, final boolean exists) throws IOException {
+ return exists;
+ }
+
+ @Override
+ public boolean postExists(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final Get get, boolean exists) throws IOException {
+ return exists;
+ }
+
+ @Override
+ public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+ }
+
+ @Override
+ public void postPut(final ObserverContext<RegionCoprocessorEnvironment> e,
final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+ }
+
+ @Override
+ public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
final Map<byte[],
+ List<KeyValue>> familyMap, final boolean writeToWAL) throws IOException {
+ }
+
+ @Override
+ public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
+ throws IOException {
+ }
+
+ @Override
+ public boolean preCheckAndPut(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final byte [] row, final byte [] family, final byte [] qualifier,
+ final CompareOp compareOp, final WritableByteArrayComparable comparator,
+ final Put put, final boolean result) throws IOException {
+ return result;
+ }
+
+ @Override
+ public boolean postCheckAndPut(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final byte [] row, final byte [] family, final byte [] qualifier,
+ final CompareOp compareOp, final WritableByteArrayComparable comparator,
+ final Put put, final boolean result) throws IOException {
+ return result;
+ }
+
+ @Override
+ public boolean preCheckAndDelete(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final byte [] row, final byte [] family, final byte [] qualifier,
+ final CompareOp compareOp, final WritableByteArrayComparable comparator,
+ final Delete delete, final boolean result) throws IOException {
+ return result;
+ }
+
+ @Override
+ public boolean postCheckAndDelete(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final byte [] row, final byte [] family, final byte [] qualifier,
+ final CompareOp compareOp, final WritableByteArrayComparable comparator,
+ final Delete delete, final boolean result) throws IOException {
+ return result;
+ }
+
+ @Override
+ public long preIncrementColumnValue(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final byte [] row, final byte [] family, final byte [] qualifier,
+ final long amount, final boolean writeToWAL) throws IOException {
+ return amount;
+ }
+
+ @Override
+ public long postIncrementColumnValue(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final byte [] row, final byte [] family, final byte [] qualifier,
+ final long amount, final boolean writeToWAL, long result)
+ throws IOException {
+ return result;
+ }
+
+ @Override
+ public void preIncrement(final ObserverContext<RegionCoprocessorEnvironment>
e,
+ final Increment increment, final Result result) throws IOException {
+ }
+
+ @Override
+ public void postIncrement(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final Increment increment, final Result result) throws IOException {
+ }
+
+ @Override
+ public InternalScanner preScannerOpen(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final Scan scan, final InternalScanner s) throws IOException {
+ return s;
+ }
+
+ @Override
+ public InternalScanner postScannerOpen(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final Scan scan, final InternalScanner s) throws IOException {
+ return s;
+ }
+
+ @Override
+ public boolean preScannerNext(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final InternalScanner s, final List<Result> results,
+ final int limit, final boolean hasMore) throws IOException {
+ return hasMore;
+ }
+
+ @Override
+ public boolean postScannerNext(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final InternalScanner s, final List<Result> results, final int limit,
+ final boolean hasMore) throws IOException {
+ return hasMore;
+ }
+
+ @Override
+ public void preScannerClose(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final InternalScanner s) throws IOException {
+ }
+
+ @Override
+ public void postScannerClose(final
ObserverContext<RegionCoprocessorEnvironment> e,
+ final InternalScanner s) throws IOException {
+ }
+
+ @Override
+ public void preWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
HRegionInfo info,
+ HLogKey logKey, WALEdit logEdit) throws IOException {
+ }
+
+ @Override
+ public void postWALRestore(ObserverContext<RegionCoprocessorEnvironment> env,
+ HRegionInfo info, HLogKey logKey, WALEdit logEdit) throws IOException {
+ }
+}
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1096568&r1=1096567&r2=1096568&view=diff
==============================================================================
---
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
(original)
+++
hbase/trunk/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
Mon Apr 25 19:21:17 2011
@@ -46,25 +46,25 @@ public interface RegionObserver extends
* Called before the region is reported as open to the master.
* @param c the environment provided by the region server
*/
- public void preOpen(final ObserverContext<RegionCoprocessorEnvironment> c);
+ void preOpen(final ObserverContext<RegionCoprocessorEnvironment> c);
/**
* Called after the region is reported as open to the master.
* @param c the environment provided by the region server
*/
- public void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c);
+ void postOpen(final ObserverContext<RegionCoprocessorEnvironment> c);
/**
* Called before the memstore is flushed to disk.
* @param c the environment provided by the region server
*/
- public void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c);
+ void preFlush(final ObserverContext<RegionCoprocessorEnvironment> c);
/**
* Called after the memstore is flushed to disk.
* @param c the environment provided by the region server
*/
- public void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c);
+ void postFlush(final ObserverContext<RegionCoprocessorEnvironment> c);
/**
* Called before compaction.
@@ -72,7 +72,7 @@ public interface RegionObserver extends
* @param willSplit true if compaction will result in a split, false
* otherwise
*/
- public void preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
+ void preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final boolean willSplit);
/**
@@ -81,7 +81,7 @@ public interface RegionObserver extends
* @param willSplit true if compaction will result in a split, false
* otherwise
*/
- public void postCompact(final ObserverContext<RegionCoprocessorEnvironment>
c,
+ void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
final boolean willSplit);
/**
@@ -89,7 +89,7 @@ public interface RegionObserver extends
* @param c the environment provided by the region server
* (e.getRegion() returns the parent region)
*/
- public void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c);
+ void preSplit(final ObserverContext<RegionCoprocessorEnvironment> c);
/**
* Called after the region is split.
@@ -98,7 +98,7 @@ public interface RegionObserver extends
* @param l the left daughter region
* @param r the right daughter region
*/
- public void postSplit(final ObserverContext<RegionCoprocessorEnvironment> c,
final HRegion l,
+ void postSplit(final ObserverContext<RegionCoprocessorEnvironment> c, final
HRegion l,
final HRegion r);
/**
@@ -106,7 +106,7 @@ public interface RegionObserver extends
* @param c the environment provided by the region server
* @param abortRequested true if the region server is aborting
*/
- public void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
+ void preClose(final ObserverContext<RegionCoprocessorEnvironment> c,
boolean abortRequested);
/**
@@ -114,7 +114,7 @@ public interface RegionObserver extends
* @param c the environment provided by the region server
* @param abortRequested true if the region server is aborting
*/
- public void postClose(final ObserverContext<RegionCoprocessorEnvironment> c,
+ void postClose(final ObserverContext<RegionCoprocessorEnvironment> c,
boolean abortRequested);
/**
@@ -132,7 +132,7 @@ public interface RegionObserver extends
* is not bypassed.
* @throws IOException if an error occurred on the coprocessor
*/
- public void preGetClosestRowBefore(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ void preGetClosestRowBefore(final
ObserverContext<RegionCoprocessorEnvironment> c,
final byte [] row, final byte [] family, final Result result)
throws IOException;
@@ -147,7 +147,7 @@ public interface RegionObserver extends
* @param result the result to return to the client, modify as necessary
* @throws IOException if an error occurred on the coprocessor
*/
- public void postGetClosestRowBefore(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ void postGetClosestRowBefore(final
ObserverContext<RegionCoprocessorEnvironment> c,
final byte [] row, final byte [] family, final Result result)
throws IOException;
@@ -165,7 +165,7 @@ public interface RegionObserver extends
* is not bypassed.
* @throws IOException if an error occurred on the coprocessor
*/
- public void preGet(final ObserverContext<RegionCoprocessorEnvironment> c,
final Get get,
+ void preGet(final ObserverContext<RegionCoprocessorEnvironment> c, final Get
get,
final List<KeyValue> result)
throws IOException;
@@ -179,7 +179,7 @@ public interface RegionObserver extends
* @param result the result to return to the client, modify as necessary
* @throws IOException if an error occurred on the coprocessor
*/
- public void postGet(final ObserverContext<RegionCoprocessorEnvironment> c,
final Get get,
+ void postGet(final ObserverContext<RegionCoprocessorEnvironment> c, final
Get get,
final List<KeyValue> result)
throws IOException;
@@ -196,7 +196,7 @@ public interface RegionObserver extends
* @return the value to return to the client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
- public boolean preExists(final ObserverContext<RegionCoprocessorEnvironment>
c, final Get get,
+ boolean preExists(final ObserverContext<RegionCoprocessorEnvironment> c,
final Get get,
final boolean exists)
throws IOException;
@@ -211,7 +211,7 @@ public interface RegionObserver extends
* @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
*/
- public boolean postExists(final
ObserverContext<RegionCoprocessorEnvironment> c, final Get get,
+ boolean postExists(final ObserverContext<RegionCoprocessorEnvironment> c,
final Get get,
final boolean exists)
throws IOException;
@@ -227,7 +227,7 @@ public interface RegionObserver extends
* @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
- public void prePut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Map<byte[],
+ void prePut(final ObserverContext<RegionCoprocessorEnvironment> c, final
Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
@@ -241,7 +241,7 @@ public interface RegionObserver extends
* @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
- public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
final Map<byte[],
+ void postPut(final ObserverContext<RegionCoprocessorEnvironment> c, final
Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
@@ -257,7 +257,7 @@ public interface RegionObserver extends
* @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
- public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Map<byte[],
+ void preDelete(final ObserverContext<RegionCoprocessorEnvironment> c, final
Map<byte[],
List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
@@ -271,7 +271,7 @@ public interface RegionObserver extends
* @param writeToWAL true if the change should be written to the WAL
* @throws IOException if an error occurred on the coprocessor
*/
- public void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
+ void postDelete(final ObserverContext<RegionCoprocessorEnvironment> c,
final Map<byte[], List<KeyValue>> familyMap, final boolean writeToWAL)
throws IOException;
@@ -294,7 +294,7 @@ public interface RegionObserver extends
* processing
* @throws IOException if an error occurred on the coprocessor
*/
- public boolean preCheckAndPut(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ boolean preCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment> c,
final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final WritableByteArrayComparable comparator,
final Put put, final boolean result)
@@ -316,7 +316,7 @@ public interface RegionObserver extends
* @return the possibly transformed return value to return to client
* @throws IOException if an error occurred on the coprocessor
*/
- public boolean postCheckAndPut(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ boolean postCheckAndPut(final ObserverContext<RegionCoprocessorEnvironment>
c,
final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final WritableByteArrayComparable comparator,
final Put put, final boolean result)
@@ -340,7 +340,7 @@ public interface RegionObserver extends
* @return the value to return to client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
- public boolean preCheckAndDelete(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ boolean preCheckAndDelete(final
ObserverContext<RegionCoprocessorEnvironment> c,
final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final WritableByteArrayComparable comparator,
final Delete delete, final boolean result)
@@ -362,7 +362,7 @@ public interface RegionObserver extends
* @return the possibly transformed returned value to return to client
* @throws IOException if an error occurred on the coprocessor
*/
- public boolean postCheckAndDelete(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ boolean postCheckAndDelete(final
ObserverContext<RegionCoprocessorEnvironment> c,
final byte [] row, final byte [] family, final byte [] qualifier,
final CompareOp compareOp, final WritableByteArrayComparable comparator,
final Delete delete, final boolean result)
@@ -384,7 +384,7 @@ public interface RegionObserver extends
* @return value to return to the client if bypassing default processing
* @throws IOException if an error occurred on the coprocessor
*/
- public long preIncrementColumnValue(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ long preIncrementColumnValue(final
ObserverContext<RegionCoprocessorEnvironment> c,
final byte [] row, final byte [] family, final byte [] qualifier,
final long amount, final boolean writeToWAL)
throws IOException;
@@ -404,7 +404,7 @@ public interface RegionObserver extends
* @return the result to return to the client
* @throws IOException if an error occurred on the coprocessor
*/
- public long postIncrementColumnValue(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ long postIncrementColumnValue(final
ObserverContext<RegionCoprocessorEnvironment> c,
final byte [] row, final byte [] family, final byte [] qualifier,
final long amount, final boolean writeToWAL, final long result)
throws IOException;
@@ -423,7 +423,7 @@ public interface RegionObserver extends
* is not bypassed.
* @throws IOException if an error occurred on the coprocessor
*/
- public void preIncrement(final ObserverContext<RegionCoprocessorEnvironment>
c,
+ void preIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
final Increment increment, final Result result)
throws IOException;
@@ -437,7 +437,7 @@ public interface RegionObserver extends
* @param result the result returned by increment, can be modified
* @throws IOException if an error occurred on the coprocessor
*/
- public void postIncrement(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ void postIncrement(final ObserverContext<RegionCoprocessorEnvironment> c,
final Increment increment, final Result result)
throws IOException;
@@ -455,7 +455,7 @@ public interface RegionObserver extends
* overriding default behavior, null otherwise
* @throws IOException if an error occurred on the coprocessor
*/
- public InternalScanner preScannerOpen(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ InternalScanner preScannerOpen(final
ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final InternalScanner s)
throws IOException;
@@ -470,7 +470,7 @@ public interface RegionObserver extends
* @return the scanner instance to use
* @throws IOException if an error occurred on the coprocessor
*/
- public InternalScanner postScannerOpen(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ InternalScanner postScannerOpen(final
ObserverContext<RegionCoprocessorEnvironment> c,
final Scan scan, final InternalScanner s)
throws IOException;
@@ -491,7 +491,7 @@ public interface RegionObserver extends
* @return 'has more' indication that should be sent to client
* @throws IOException if an error occurred on the coprocessor
*/
- public boolean preScannerNext(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ boolean preScannerNext(final ObserverContext<RegionCoprocessorEnvironment> c,
final InternalScanner s, final List<Result> result,
final int limit, final boolean hasNext)
throws IOException;
@@ -509,7 +509,7 @@ public interface RegionObserver extends
* @return 'has more' indication that should be sent to client
* @throws IOException if an error occurred on the coprocessor
*/
- public boolean postScannerNext(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ boolean postScannerNext(final ObserverContext<RegionCoprocessorEnvironment>
c,
final InternalScanner s, final List<Result> result, final int limit,
final boolean hasNext)
throws IOException;
@@ -525,7 +525,7 @@ public interface RegionObserver extends
* @param s the scanner
* @throws IOException if an error occurred on the coprocessor
*/
- public void preScannerClose(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ void preScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
final InternalScanner s)
throws IOException;
@@ -538,7 +538,7 @@ public interface RegionObserver extends
* @param s the scanner
* @throws IOException if an error occurred on the coprocessor
*/
- public void postScannerClose(final
ObserverContext<RegionCoprocessorEnvironment> c,
+ void postScannerClose(final ObserverContext<RegionCoprocessorEnvironment> c,
final InternalScanner s)
throws IOException;
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java?rev=1096568&r1=1096567&r2=1096568&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SampleRegionWALObserver.java
Mon Apr 25 19:21:17 2011
@@ -33,15 +33,14 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
/**
- * Class for testing WAL coprocessor extension. WAL write monitor is defined
- * in LogObserver while WAL Restore is in RegionObserver.
+ * Class for testing WALObserver coprocessor.
*
- * It will monitor a WAL writing and Restore, modify passed-in WALEdit, i.e,
- * ignore specified columns when writing, and add a KeyValue. On the other
- * hand, it checks whether the ignored column is still in WAL when Restoreed
+ * It will monitor WAL writing and restoring, and modify passed-in WALEdit,
i.e,
+ * ignore specified columns when writing, or add a KeyValue. On the other
+ * side, it checks whether the ignored column is still in WAL when Restoreed
* at region reconstruct.
*/
-public class SampleRegionWALObserver extends BaseRegionObserverCoprocessor
+public class SampleRegionWALObserver extends BaseRegionObserver
implements WALObserver {
private static final Log LOG =
LogFactory.getLog(SampleRegionWALObserver.class);
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java?rev=1096568&r1=1096567&r2=1096568&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/SimpleRegionObserver.java
Mon Apr 25 19:21:17 2011
@@ -43,7 +43,7 @@ import org.apache.hadoop.hbase.util.Byte
* A sample region observer that tests the RegionObserver interface.
* It works with TestRegionObserverInterface to provide the test case.
*/
-public class SimpleRegionObserver extends BaseRegionObserverCoprocessor {
+public class SimpleRegionObserver extends BaseRegionObserver {
static final Log LOG = LogFactory.getLog(TestRegionObserverInterface.class);
boolean beforeDelete = true;
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java?rev=1096568&r1=1096567&r2=1096568&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java
Mon Apr 25 19:21:17 2011
@@ -47,7 +47,7 @@ public class TestCoprocessorInterface ex
private static final HBaseTestingUtility TEST_UTIL =
new HBaseTestingUtility();
- public static class CoprocessorImpl extends BaseRegionObserverCoprocessor {
+ public static class CoprocessorImpl extends BaseRegionObserver {
private boolean startCalled;
private boolean stopCalled;
Modified:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java?rev=1096568&r1=1096567&r2=1096568&view=diff
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
(original)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionObserverStacking.java
Mon Apr 25 19:21:17 2011
@@ -42,7 +42,7 @@ import junit.framework.TestCase;
public class TestRegionObserverStacking extends TestCase {
static final String DIR = "test/build/data/TestRegionObserverStacking/";
- public static class ObserverA extends BaseRegionObserverCoprocessor {
+ public static class ObserverA extends BaseRegionObserver {
long id;
@Override
public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -56,7 +56,7 @@ public class TestRegionObserverStacking
}
}
- public static class ObserverB extends BaseRegionObserverCoprocessor {
+ public static class ObserverB extends BaseRegionObserver {
long id;
@Override
public void postPut(final ObserverContext<RegionCoprocessorEnvironment> c,
@@ -70,7 +70,7 @@ public class TestRegionObserverStacking
}
}
- public static class ObserverC extends BaseRegionObserverCoprocessor {
+ public static class ObserverC extends BaseRegionObserver {
long id;
@Override
Added:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java?rev=1096568&view=auto
==============================================================================
---
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
(added)
+++
hbase/trunk/src/test/java/org/apache/hadoop/hbase/coprocessor/TestWALObserver.java
Mon Apr 25 19:21:17 2011
@@ -0,0 +1,373 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.coprocessor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.regionserver.wal.HLogSplitter;
+import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdge;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.*;
+
+/**
+ * Tests invocation of the {@link
org.apache.hadoop.hbase.coprocessor.MasterObserver}
+ * interface hooks at all appropriate times during normal HMaster operations.
+ */
+public class TestWALObserver {
+ private static final Log LOG = LogFactory.getLog(TestWALObserver.class);
+ private final static HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+
+ private static byte[] TEST_TABLE = Bytes.toBytes("observedTable");
+ private static byte[][] TEST_FAMILY = { Bytes.toBytes("fam1"),
+ Bytes.toBytes("fam2"),
+ Bytes.toBytes("fam3"),
+ };
+ private static byte[][] TEST_QUALIFIER = { Bytes.toBytes("q1"),
+ Bytes.toBytes("q2"),
+ Bytes.toBytes("q3"),
+ };
+ private static byte[][] TEST_VALUE = { Bytes.toBytes("v1"),
+ Bytes.toBytes("v2"),
+ Bytes.toBytes("v3"),
+ };
+ private static byte[] TEST_ROW = Bytes.toBytes("testRow");
+
+ private Configuration conf;
+ private FileSystem fs;
+ private Path dir;
+ private MiniDFSCluster cluster;
+ private Path hbaseRootDir;
+ private Path oldLogDir;
+ private Path logDir;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
+ SampleRegionWALObserver.class.getName());
+ conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
+ SampleRegionWALObserver.class.getName());
+ conf.setBoolean("dfs.support.append", true);
+ conf.setInt("dfs.client.block.recovery.retries", 2);
+ conf.setInt("hbase.regionserver.flushlogentries", 1);
+
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.setNameNodeNameSystemLeasePeriod(100, 10000);
+ Path hbaseRootDir =
+ TEST_UTIL.getDFSCluster().getFileSystem().makeQualified(new
Path("/hbase"));
+ LOG.info("hbase.rootdir=" + hbaseRootDir);
+ conf.set(HConstants.HBASE_DIR, hbaseRootDir.toString());
+ }
+
+ @AfterClass
+ public static void teardownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ this.conf = HBaseConfiguration.create(TEST_UTIL.getConfiguration());
+ //this.cluster = TEST_UTIL.getDFSCluster();
+ this.fs = TEST_UTIL.getDFSCluster().getFileSystem();
+ this.hbaseRootDir = new Path(conf.get(HConstants.HBASE_DIR));
+ this.dir = new Path(this.hbaseRootDir, TestWALObserver.class.getName());
+ this.oldLogDir = new Path(this.hbaseRootDir,
HConstants.HREGION_OLDLOGDIR_NAME);
+ this.logDir = new Path(this.hbaseRootDir, HConstants.HREGION_LOGDIR_NAME);
+
+ if (TEST_UTIL.getDFSCluster().getFileSystem().exists(this.hbaseRootDir)) {
+ TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir,
true);
+ }
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ TEST_UTIL.getDFSCluster().getFileSystem().delete(this.hbaseRootDir, true);
+ }
+
+ /**
+ * Test WAL write behavior with WALObserver. The coprocessor monitors
+ * a WALEdit written to WAL, and ignore, modify, and add KeyValue's for the
+ * WALEdit.
+ */
+ @Test
+ public void testWALObserverWriteToWAL() throws Exception {
+ HRegionInfo hri =
createBasic3FamilyHRegionInfo(Bytes.toString(TEST_TABLE));
+ Path basedir = new Path(this.hbaseRootDir, Bytes.toString(TEST_TABLE));
+ deleteDir(basedir);
+ fs.mkdirs(new Path(basedir, hri.getEncodedName()));
+
+ HLog log = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
+ SampleRegionWALObserver cp = getCoprocessor(log);
+
+ // TEST_FAMILY[0] shall be removed from WALEdit.
+ // TEST_FAMILY[1] value shall be changed.
+ // TEST_FAMILY[2] shall be added to WALEdit, although it's not in the put.
+ cp.setTestValues(TEST_TABLE, TEST_ROW, TEST_FAMILY[0], TEST_QUALIFIER[0],
+ TEST_FAMILY[1], TEST_QUALIFIER[1],
+ TEST_FAMILY[2], TEST_QUALIFIER[2]);
+
+ assertFalse(cp.isPreWALWriteCalled());
+ assertFalse(cp.isPostWALWriteCalled());
+
+ // TEST_FAMILY[2] is not in the put, however it shall be added by the
tested
+ // coprocessor.
+ // Use a Put to create familyMap.
+ Put p = creatPutWith2Families(TEST_ROW);
+
+ Map<byte [], List<KeyValue>> familyMap = p.getFamilyMap();
+ WALEdit edit = new WALEdit();
+ addFamilyMapToWALEdit(familyMap, edit);
+
+ boolean foundFamily0 = false;
+ boolean foundFamily2 = false;
+ boolean modifiedFamily1 = false;
+
+ List<KeyValue> kvs = edit.getKeyValues();
+
+ for (KeyValue kv : kvs) {
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
+ foundFamily0 = true;
+ }
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
+ foundFamily2 = true;
+ }
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
+ if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
+ modifiedFamily1 = true;
+ }
+ }
+ }
+ assertTrue(foundFamily0);
+ assertFalse(foundFamily2);
+ assertFalse(modifiedFamily1);
+
+ // it's where WAL write cp should occur.
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ log.append(hri, hri.getTableDesc().getName(), edit, now);
+
+ // the edit shall have been change now by the coprocessor.
+ foundFamily0 = false;
+ foundFamily2 = false;
+ modifiedFamily1 = false;
+ for (KeyValue kv : kvs) {
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[0])) {
+ foundFamily0 = true;
+ }
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[2])) {
+ foundFamily2 = true;
+ }
+ if (Arrays.equals(kv.getFamily(), TEST_FAMILY[1])) {
+ if (!Arrays.equals(kv.getValue(), TEST_VALUE[1])) {
+ modifiedFamily1 = true;
+ }
+ }
+ }
+ assertFalse(foundFamily0);
+ assertTrue(foundFamily2);
+ assertTrue(modifiedFamily1);
+
+ assertTrue(cp.isPreWALWriteCalled());
+ assertTrue(cp.isPostWALWriteCalled());
+ }
+
+ /**
+ * Test WAL replay behavior with WALObserver.
+ */
+ @Test
+ public void testWALObserverReplay() throws Exception {
+ // WAL replay is handled at HRegion::replayRecoveredEdits(), which is
+ // ultimately called by HRegion::initialize()
+ byte[] tableName = Bytes.toBytes("testWALCoprocessorReplay");
+
+ final HRegionInfo hri =
createBasic3FamilyHRegionInfo(Bytes.toString(tableName));
+ final Path basedir = new Path(this.hbaseRootDir,
Bytes.toString(tableName));
+ deleteDir(basedir);
+ fs.mkdirs(new Path(basedir, hri.getEncodedName()));
+
+ //HLog wal = new HLog(this.fs, this.dir, this.oldLogDir, this.conf);
+ HLog wal = createWAL(this.conf);
+ //Put p = creatPutWith2Families(TEST_ROW);
+ WALEdit edit = new WALEdit();
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ //addFamilyMapToWALEdit(p.getFamilyMap(), edit);
+ final int countPerFamily = 1000;
+ for (HColumnDescriptor hcd: hri.getTableDesc().getFamilies()) {
+ addWALEdits(tableName, hri, TEST_ROW, hcd.getName(), countPerFamily,
+ EnvironmentEdgeManager.getDelegate(), wal);
+ }
+ wal.append(hri, tableName, edit, now);
+ // sync to fs.
+ wal.sync();
+
+ final Configuration newConf = HBaseConfiguration.create(this.conf);
+ User user = HBaseTestingUtility.getDifferentUser(newConf,
+ ".replay.wal.secondtime");
+ user.runAs(new PrivilegedExceptionAction() {
+ public Object run() throws Exception {
+ runWALSplit(newConf);
+ FileSystem newFS = FileSystem.get(newConf);
+ // Make a new wal for new region open.
+ HLog wal2 = createWAL(newConf);
+ HRegion region2 = new HRegion(basedir, wal2, FileSystem.get(newConf),
+ newConf, hri, TEST_UTIL.getHBaseCluster().getRegionServer(0));
+ long seqid2 = region2.initialize();
+
+ SampleRegionWALObserver cp2 =
+
(SampleRegionWALObserver)region2.getCoprocessorHost().findCoprocessor(
+ SampleRegionWALObserver.class.getName());
+ // TODO: asserting here is problematic.
+ assertNotNull(cp2);
+ assertTrue(cp2.isPreWALRestoreCalled());
+ assertTrue(cp2.isPostWALRestoreCalled());
+ region2.close();
+ wal2.closeAndDelete();
+ return null;
+ }
+ });
+ }
+ /**
+ * Test to see CP loaded successfully or not. There is a duplication
+ * at TestHLog, but the purpose of that one is to see whether the loaded
+ * CP will impact existing HLog tests or not.
+ */
+ @Test
+ public void testWALObserverLoaded() throws Exception {
+ HLog log = new HLog(fs, dir, oldLogDir, conf);
+ assertNotNull(getCoprocessor(log));
+ }
+
+ private SampleRegionWALObserver getCoprocessor(HLog wal) throws Exception {
+ WALCoprocessorHost host = wal.getCoprocessorHost();
+ Coprocessor c =
host.findCoprocessor(SampleRegionWALObserver.class.getName());
+ return (SampleRegionWALObserver)c;
+ }
+
+ /*
+ * Creates an HRI around an HTD that has <code>tableName</code> and three
+ * column families named.
+ * @param tableName Name of table to use when we create HTableDescriptor.
+ */
+ private HRegionInfo createBasic3FamilyHRegionInfo(final String tableName) {
+ HTableDescriptor htd = new HTableDescriptor(tableName);
+
+ for (int i = 0; i < TEST_FAMILY.length; i++ ) {
+ HColumnDescriptor a = new HColumnDescriptor(TEST_FAMILY[i]);
+ htd.addFamily(a);
+ }
+ return new HRegionInfo(htd, null, null, false);
+ }
+
+ /*
+ * @param p Directory to cleanup
+ */
+ private void deleteDir(final Path p) throws IOException {
+ if (this.fs.exists(p)) {
+ if (!this.fs.delete(p, true)) {
+ throw new IOException("Failed remove of " + p);
+ }
+ }
+ }
+
+ private Put creatPutWith2Families(byte[] row) throws IOException {
+ Put p = new Put(row);
+ for (int i = 0; i < TEST_FAMILY.length-1; i++ ) {
+ p.add(TEST_FAMILY[i], TEST_QUALIFIER[i],
+ TEST_VALUE[i]);
+ }
+ return p;
+ }
+
+ /**
+ * Copied from HRegion.
+ *
+ * @param familyMap map of family->edits
+ * @param walEdit the destination entry to append into
+ */
+ private void addFamilyMapToWALEdit(Map<byte[], List<KeyValue>> familyMap,
+ WALEdit walEdit) {
+ for (List<KeyValue> edits : familyMap.values()) {
+ for (KeyValue kv : edits) {
+ walEdit.add(kv);
+ }
+ }
+ }
+ private Path runWALSplit(final Configuration c) throws IOException {
+ FileSystem fs = FileSystem.get(c);
+ HLogSplitter logSplitter = HLogSplitter.createLogSplitter(c,
+ this.hbaseRootDir, this.logDir, this.oldLogDir, fs);
+ List<Path> splits = logSplitter.splitLog();
+ // Split should generate only 1 file since there's only 1 region
+ assertEquals(1, splits.size());
+ // Make sure the file exists
+ assertTrue(fs.exists(splits.get(0)));
+ LOG.info("Split file=" + splits.get(0));
+ return splits.get(0);
+ }
+ private HLog createWAL(final Configuration c) throws IOException {
+ HLog wal = new HLog(FileSystem.get(c), logDir, oldLogDir, c);
+ return wal;
+ }
+ private void addWALEdits (final byte [] tableName, final HRegionInfo hri,
+ final byte [] rowName, final byte [] family,
+ final int count, EnvironmentEdge ee, final HLog wal)
+ throws IOException {
+ String familyStr = Bytes.toString(family);
+ for (int j = 0; j < count; j++) {
+ byte[] qualifierBytes = Bytes.toBytes(Integer.toString(j));
+ byte[] columnBytes = Bytes.toBytes(familyStr + ":" +
Integer.toString(j));
+ WALEdit edit = new WALEdit();
+ edit.add(new KeyValue(rowName, family, qualifierBytes,
+ ee.currentTimeMillis(), columnBytes));
+ wal.append(hri, tableName, edit, ee.currentTimeMillis());
+ }
+ }
+}
+