Author: jyates
Date: Wed Jul 31 20:16:43 2013
New Revision: 1509015
URL: http://svn.apache.org/r1509015
Log:
HBASE-9049: Generalize ServerCallable creation to support custom callables
Added:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
Removed:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCaller.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
Wed Jul 31 20:16:43 2013
@@ -101,6 +101,7 @@ class AsyncProcess<CResult> {
protected int numTries;
protected final boolean useServerTrackerForRetries;
protected int serverTrackerTimeout;
+ protected RpcRetryingCallerFactory rpcCallerFactory;
/**
@@ -167,7 +168,8 @@ class AsyncProcess<CResult> {
}
public AsyncProcess(HConnection hc, byte[] tableName, ExecutorService pool,
- AsyncProcessCallback<CResult> callback, Configuration
conf) {
+ AsyncProcessCallback<CResult> callback, Configuration conf,
+ RpcRetryingCallerFactory rpcCaller) {
this.hConnection = hc;
this.tableName = tableName;
this.pool = pool;
@@ -201,6 +203,8 @@ class AsyncProcess<CResult> {
serverTrackerTimeout += ConnectionUtils.getPauseTime(this.pause, i);
}
}
+
+ this.rpcCallerFactory = rpcCaller;
}
/**
@@ -452,7 +456,7 @@ class AsyncProcess<CResult> {
*/
protected RpcRetryingCaller<MultiResponse>
createCaller(MultiServerCallable<Row> callable) {
// callable is unused.
- return new RpcRetryingCaller<MultiResponse>();
+ return rpcCallerFactory.<MultiResponse> newCaller();
}
/**
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
Wed Jul 31 20:16:43 2013
@@ -66,7 +66,7 @@ public class ClientScanner extends Abstr
private final byte[] tableName;
private final int scannerTimeout;
private boolean scanMetricsPublished = false;
- private ScannerCaller caller = new ScannerCaller();
+ private RpcRetryingCaller<Result []> caller;
/**
* Create a new ClientScanner for the specified table. An HConnection will
be
@@ -83,6 +83,7 @@ public class ClientScanner extends Abstr
this(conf, scan, tableName, HConnectionManager.getConnection(conf));
}
+
/**
* Create a new ClientScanner for the specified table
* Note that the passed {@link Scan}'s start row maybe changed changed.
@@ -93,8 +94,22 @@ public class ClientScanner extends Abstr
* @param connection Connection identifying the cluster
* @throws IOException
*/
- public ClientScanner(final Configuration conf, final Scan scan,
- final byte[] tableName, HConnection connection) throws IOException {
+ public ClientScanner(final Configuration conf, final Scan scan, final byte[]
tableName,
+ HConnection connection) throws IOException {
+ this(conf, scan, tableName, connection, new
RpcRetryingCallerFactory(conf));
+ }
+
+ /**
+ * Create a new ClientScanner for the specified table Note that the passed
{@link Scan}'s start
+ * row maybe changed changed.
+ * @param conf The {@link Configuration} to use.
+ * @param scan {@link Scan} to use in this scanner
+ * @param tableName The table that we wish to scan
+ * @param connection Connection identifying the cluster
+ * @throws IOException
+ */
+ public ClientScanner(final Configuration conf, final Scan scan, final byte[]
tableName,
+ HConnection connection, RpcRetryingCallerFactory rpcFactory) throws
IOException {
if (LOG.isTraceEnabled()) {
LOG.trace("Scan table=" + Bytes.toString(tableName)
+ ", startRow=" + Bytes.toStringBinary(scan.getStartRow()));
@@ -131,6 +146,8 @@ public class ClientScanner extends Abstr
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
}
+ this.caller = rpcFactory.<Result[]> newCaller();
+
// initialize the scanner
nextScanner(false);
}
@@ -180,7 +197,7 @@ public class ClientScanner extends Abstr
// Close the previous scanner if it's open
if (this.callable != null) {
this.callable.setClose();
- this.caller.callWithRetries(callable,
getConnection().getConfiguration());
+ this.caller.callWithRetries(callable);
this.callable = null;
}
@@ -217,7 +234,7 @@ public class ClientScanner extends Abstr
callable = getScannerCallable(localStartKey);
// Open a scanner on the region server starting at the
// beginning of the region
- this.caller.callWithRetries(callable,
getConnection().getConfiguration());
+ this.caller.callWithRetries(callable);
this.currentRegion = callable.getHRegionInfo();
if (this.scanMetrics != null) {
this.scanMetrics.countOfRegions.incrementAndGet();
@@ -277,10 +294,10 @@ public class ClientScanner extends Abstr
// Server returns a null values if scanning is to stop. Else,
// returns an empty array if scanning is to go on and we've just
// exhausted current region.
- values = this.caller.callWithRetries(callable,
getConnection().getConfiguration());
+ values = this.caller.callWithRetries(callable);
if (skipFirst && values != null && values.length == 1) {
skipFirst = false; // Already skipped, unset it before scanning
again
- values = this.caller.callWithRetries(callable,
getConnection().getConfiguration());
+ values = this.caller.callWithRetries(callable);
}
retryAfterOutOfOrderException = true;
} catch (DoNotRetryIOException e) {
@@ -403,7 +420,7 @@ public class ClientScanner extends Abstr
if (callable != null) {
callable.setClose();
try {
- this.caller.callWithRetries(callable,
getConnection().getConfiguration());
+ this.caller.callWithRetries(callable);
} catch (IOException e) {
// We used to catch this error, interpret, and rethrow. However, we
// have since decided that it's not nice for a scanner's close to
Added:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java?rev=1509015&view=auto
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
(added)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/DelegatingRetryingCallable.java
Wed Jul 31 20:16:43 2013
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+
+public class DelegatingRetryingCallable<T, D extends RetryingCallable<T>>
implements
+ RetryingCallable<T> {
+ protected final D delegate;
+
+ public DelegatingRetryingCallable(D delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public T call() throws Exception {
+ return delegate.call();
+ }
+
+ @Override
+ public void prepare(boolean reload) throws IOException {
+ delegate.prepare(reload);
+ }
+
+ @Override
+ public void throwable(Throwable t, boolean retrying) {
+ delegate.throwable(t, retrying);
+ }
+
+ @Override
+ public String getExceptionMessageAdditionalDetail() {
+ return delegate.getExceptionMessageAdditionalDetail();
+ }
+
+ @Override
+ public long sleep(long pause, int tries) {
+ return delegate.sleep(pause, tries);
+ }
+}
\ No newline at end of file
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
Wed Jul 31 20:16:43 2013
@@ -154,6 +154,8 @@ public class HBaseAdmin implements Abort
private boolean aborted;
private boolean cleanupConnectionOnClose = false; // close the connection in
close()
+ private RpcRetryingCallerFactory rpcCallerFactory;
+
/**
* Constructor.
* See {@link #HBaseAdmin(HConnection connection)}
@@ -186,6 +188,7 @@ public class HBaseAdmin implements Abort
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
this.retryLongerMultiplier = this.conf.getInt(
"hbase.client.retries.longer.multiplier", 10);
+ this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf);
}
/**
@@ -2653,10 +2656,9 @@ public class HBaseAdmin implements Abort
*/
abstract static class MasterAdminCallable<V> extends MasterCallable<V> {
protected MasterAdminKeepAliveConnection masterAdmin;
- private final HConnection connection;
public MasterAdminCallable(final HConnection connection) {
- this.connection = connection;
+ super(connection);
}
@Override
@@ -2675,10 +2677,9 @@ public class HBaseAdmin implements Abort
*/
abstract static class MasterMonitorCallable<V> extends MasterCallable<V> {
protected MasterMonitorKeepAliveConnection masterMonitor;
- private final HConnection connection;
public MasterMonitorCallable(final HConnection connection) {
- this.connection = connection;
+ super(connection);
}
@Override
@@ -2698,6 +2699,12 @@ public class HBaseAdmin implements Abort
* @param <V>
*/
abstract static class MasterCallable<V> implements RetryingCallable<V>,
Closeable {
+ protected HConnection connection;
+
+ public MasterCallable(final HConnection connection) {
+ this.connection = connection;
+ }
+
@Override
public void throwable(Throwable t, boolean retrying) {
}
@@ -2714,9 +2721,9 @@ public class HBaseAdmin implements Abort
}
private <V> V executeCallable(MasterCallable<V> callable) throws IOException
{
- RpcRetryingCaller<V> caller = new RpcRetryingCaller<V>();
+ RpcRetryingCaller<V> caller = rpcCallerFactory.newCaller();
try {
- return caller.callWithRetries(callable, getConfiguration());
+ return caller.callWithRetries(callable);
} finally {
callable.close();
}
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HConnectionManager.java
Wed Jul 31 20:16:43 2013
@@ -69,7 +69,6 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
import
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
-import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.TableSchema;
import
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnRequest;
import
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AddColumnResponse;
import
org.apache.hadoop.hbase.protobuf.generated.MasterAdminProtos.AssignRegionRequest;
@@ -2100,7 +2099,8 @@ public class HConnectionManager {
// For tests.
protected <R> AsyncProcess createAsyncProcess(byte[] tableName,
ExecutorService pool,
AsyncProcess.AsyncProcessCallback<R> callback, Configuration conf) {
- return new AsyncProcess<R>(this, tableName, pool, callback, conf);
+ return new AsyncProcess<R>(this, tableName, pool, callback, conf,
+ RpcRetryingCallerFactory.instantiate(conf));
}
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
Wed Jul 31 20:16:43 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.util.Threads;
import java.io.Closeable;
@@ -135,6 +136,7 @@ public class HTable implements HTableInt
/** The Async process for puts with autoflush set to false or multiputs */
protected AsyncProcess<Object> ap;
+ private RpcRetryingCallerFactory rpcCallerFactory;
/**
* Creates an object to access a HBase table.
@@ -267,7 +269,9 @@ public class HTable implements HTableInt
HConstants.HBASE_CLIENT_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
- ap = new AsyncProcess<Object>(connection, tableName, pool, null,
configuration);
+ this.rpcCallerFactory =
RpcRetryingCallerFactory.instantiate(configuration);
+ ap = new AsyncProcess<Object>(connection, tableName, pool, null,
+ configuration, rpcCallerFactory);
this.maxKeyValueSize = this.configuration.getInt(
"hbase.client.keyvalue.maxsize", -1);
@@ -596,9 +600,8 @@ public class HTable implements HTableInt
getLocation().getRegionInfo().getRegionName(), row, family);
}
};
- return new RpcRetryingCaller<Result>().
- callWithRetries(callable, getConfiguration(), this.operationTimeout);
- }
+ return rpcCallerFactory.<Result> newCaller().callWithRetries(callable,
this.operationTimeout);
+ }
/**
* {@inheritDoc}
@@ -643,8 +646,7 @@ public class HTable implements HTableInt
return ProtobufUtil.get(getStub(),
getLocation().getRegionInfo().getRegionName(), get);
}
};
- return new RpcRetryingCaller<Result>().
- callWithRetries(callable, getConfiguration(), this.operationTimeout);
+ return rpcCallerFactory.<Result> newCaller().callWithRetries(callable,
this.operationTimeout);
}
/**
@@ -719,8 +721,7 @@ public class HTable implements HTableInt
}
}
};
- new RpcRetryingCaller<Boolean>().
- callWithRetries(callable, getConfiguration(), this.operationTimeout);
+ rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable,
this.operationTimeout);
}
/**
@@ -856,8 +857,7 @@ public class HTable implements HTableInt
return null;
}
};
- new RpcRetryingCaller<Void>().
- callWithRetries(callable, getConfiguration(), this.operationTimeout);
+ rpcCallerFactory.<Void> newCaller().callWithRetries(callable,
this.operationTimeout);
}
/**
@@ -884,8 +884,7 @@ public class HTable implements HTableInt
}
}
};
- return new RpcRetryingCaller<Result>().
- callWithRetries(callable, getConfiguration(), this.operationTimeout);
+ return rpcCallerFactory.<Result> newCaller().callWithRetries(callable,
this.operationTimeout);
}
/**
@@ -911,8 +910,7 @@ public class HTable implements HTableInt
}
}
};
- return new RpcRetryingCaller<Result>().
- callWithRetries(callable, getConfiguration(), this.operationTimeout);
+ return rpcCallerFactory.<Result> newCaller().callWithRetries(callable,
this.operationTimeout);
}
/**
@@ -962,8 +960,7 @@ public class HTable implements HTableInt
}
}
};
- return new RpcRetryingCaller<Long>().
- callWithRetries(callable, getConfiguration(), this.operationTimeout);
+ return rpcCallerFactory.<Long> newCaller().callWithRetries(callable,
this.operationTimeout);
}
/**
@@ -988,8 +985,7 @@ public class HTable implements HTableInt
}
}
};
- return new RpcRetryingCaller<Boolean>().
- callWithRetries(callable, getConfiguration(), this.operationTimeout);
+ return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable,
this.operationTimeout);
}
@@ -1015,8 +1011,7 @@ public class HTable implements HTableInt
}
}
};
- return new RpcRetryingCaller<Boolean>().
- callWithRetries(callable, getConfiguration(), this.operationTimeout);
+ return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable,
this.operationTimeout);
}
/**
@@ -1037,8 +1032,7 @@ public class HTable implements HTableInt
}
}
};
- return new RpcRetryingCaller<Boolean>().
- callWithRetries(callable, getConfiguration(), this.operationTimeout);
+ return rpcCallerFactory.<Boolean> newCaller().callWithRetries(callable,
this.operationTimeout);
}
/**
@@ -1144,8 +1138,8 @@ public class HTable implements HTableInt
}
}
};
- return new RpcRetryingCaller<List<Boolean>>().
- callWithRetries(callable, getConfiguration(), operationTimeout);
+ return rpcCallerFactory.<List<Boolean>>
newCaller().callWithRetries(callable,
+ operationTimeout);
}
};
futures.put(getsByRegionEntry.getKey(), pool.submit(callable));
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCaller.java
Wed Jul 31 20:16:43 2013
@@ -62,8 +62,16 @@ public class RpcRetryingCaller<T> {
private long startTime, endTime;
private final static int MIN_RPC_TIMEOUT = 2000;
- public RpcRetryingCaller() {
+ private final long pause;
+ private final int retries;
+
+ public RpcRetryingCaller(Configuration conf) {
super();
+ this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
+ HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+ this.retries =
+ conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
+ HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
}
private void beforeCall() {
@@ -83,32 +91,20 @@ public class RpcRetryingCaller<T> {
this.endTime = EnvironmentEdgeManager.currentTimeMillis();
}
- public synchronized T callWithRetries(RetryingCallable<T> callable, final
Configuration conf)
- throws IOException, RuntimeException {
- return callWithRetries(callable, conf,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- }
-
- public synchronized T callWithRetries(RetryingCallable<T> callable, final
Configuration conf,
- final int callTimeout)
- throws IOException, RuntimeException {
- final long pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
- HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
- final int numRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- return callWithRetries(callable, callTimeout, pause, numRetries);
+ public synchronized T callWithRetries(RetryingCallable<T> callable) throws
IOException,
+ RuntimeException {
+ return callWithRetries(callable,
HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
}
/**
* Retries if invocation fails.
- * @param conf
* @param callTimeout Timeout for this call
* @param callable The {@link RetryingCallable} to run.
* @return an object of type T
* @throws IOException if a remote or network exception occurs
* @throws RuntimeException other unspecified error
*/
- synchronized T callWithRetries(RetryingCallable<T> callable, int
callTimeout, final long pause,
- final int retries)
+ public synchronized T callWithRetries(RetryingCallable<T> callable, int
callTimeout)
throws IOException, RuntimeException {
this.callTimeout = callTimeout;
List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
Added:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java?rev=1509015&view=auto
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
(added)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerFactory.java
Wed Jul 31 20:16:43 2013
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.util.ReflectionUtils;
+
+/**
+ * Factory to create an {@link RpcRetryingCaller}
+ */
+public class RpcRetryingCallerFactory {
+
+ /** Configuration key for a custom {@link RpcRetryingCaller} */
+ public static final String CUSTOM_CALLER_CONF_KEY =
"hbase.rpc.callerfactory.class";
+ protected final Configuration conf;
+
+ public RpcRetryingCallerFactory(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public <T> RpcRetryingCaller<T> newCaller() {
+ return new RpcRetryingCaller<T>(conf);
+ }
+
+ public static RpcRetryingCallerFactory instantiate(Configuration
configuration) {
+ String rpcCallerFactoryClazz =
+ configuration.get(RpcRetryingCallerFactory.CUSTOM_CALLER_CONF_KEY,
+ RpcRetryingCallerFactory.class.getName());
+ return ReflectionUtils.instantiateWithCustomCtor(rpcCallerFactoryClazz,
+ new Class[] { Configuration.class }, new Object[] { configuration });
+ }
+}
\ No newline at end of file
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
(original)
+++
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RegionCoprocessorRpcChannel.java
Wed Jul 31 20:16:43 2013
@@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.RegionServerCallable;
-import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos;
import
org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceResponse;
@@ -52,10 +52,13 @@ public class RegionCoprocessorRpcChannel
private final byte[] row;
private byte[] lastRegion;
+ private RpcRetryingCallerFactory rpcFactory;
+
public RegionCoprocessorRpcChannel(HConnection conn, byte[] table, byte[]
row) {
this.connection = conn;
this.table = table;
this.row = row;
+ this.rpcFactory =
RpcRetryingCallerFactory.instantiate(conn.getConfiguration());
}
@Override
@@ -83,8 +86,8 @@ public class RegionCoprocessorRpcChannel
return ProtobufUtil.execService(getStub(), call, regionName);
}
};
- CoprocessorServiceResponse result = new
RpcRetryingCaller<CoprocessorServiceResponse>().
- callWithRetries(callable, this.connection.getConfiguration());
+ CoprocessorServiceResponse result =
rpcFactory.<CoprocessorServiceResponse> newCaller()
+ .callWithRetries(callable);
Message response = null;
if (result.getValue().hasValue()) {
response = responsePrototype.newBuilderForType()
Modified:
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
(original)
+++
hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
Wed Jul 31 20:16:43 2013
@@ -51,7 +51,7 @@ public class TestAsyncProcess {
private static final byte[] DUMMY_BYTES_1 = "DUMMY_BYTES_1".getBytes();
private static final byte[] DUMMY_BYTES_2 = "DUMMY_BYTES_2".getBytes();
private static final byte[] FAILS = "FAILS".getBytes();
- private Configuration conf = new Configuration();
+ private static final Configuration conf = new Configuration();
private static ServerName sn = new ServerName("localhost:10,1254");
@@ -67,13 +67,13 @@ public class TestAsyncProcess {
public MyAsyncProcess(HConnection hc, AsyncProcessCallback<Res> callback,
Configuration conf) {
super(hc, DUMMY_TABLE, new ThreadPoolExecutor(1, 10, 60,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
Threads.newDaemonThreadFactory("test-TestAsyncProcess")),
- callback, conf);
+ callback, conf, new RpcRetryingCallerFactory(conf));
}
@Override
protected RpcRetryingCaller<MultiResponse>
createCaller(MultiServerCallable<Row> callable) {
final MultiResponse mr = createMultiResponse(callable.getLocation(),
callable.getMulti());
- return new RpcRetryingCaller<MultiResponse>() {
+ return new RpcRetryingCaller<MultiResponse>(conf) {
@Override
public MultiResponse callWithoutRetries(
RetryingCallable<MultiResponse> callable)
throws IOException, RuntimeException {
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
(original)
+++
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/LoadIncrementalHFiles.java
Wed Jul 31 20:16:43 2013
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.client.HC
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.coprocessor.SecureBulkLoadClient;
import org.apache.hadoop.hbase.io.HalfStoreFileReader;
import org.apache.hadoop.hbase.io.Reference;
@@ -588,7 +589,9 @@ public class LoadIncrementalHFiles exten
try {
List<LoadQueueItem> toRetry = new ArrayList<LoadQueueItem>();
- boolean success = new
RpcRetryingCaller<Boolean>().callWithRetries(svrCallable, getConf());
+ Configuration conf = getConf();
+ boolean success = RpcRetryingCallerFactory.instantiate(conf).<Boolean>
newCaller()
+ .callWithRetries(svrCallable);
if (!success) {
LOG.warn("Attempt to bulk load region containing "
+ Bytes.toStringBinary(first) + " into table "
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
(original)
+++
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEditsReplaySink.java
Wed Jul 31 20:16:43 2013
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.HC
import org.apache.hadoop.hbase.client.RegionServerCallable;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
@@ -162,9 +163,10 @@ public class WALEditsReplaySink {
private void replayEdits(final HRegionLocation regionLoc, final HRegionInfo
regionInfo,
final List<Action<Row>> actions) throws IOException {
try {
+ RpcRetryingCallerFactory factory =
RpcRetryingCallerFactory.instantiate(conf);
ReplayServerCallable<MultiResponse> callable = new
ReplayServerCallable<MultiResponse>(
this.conn, this.tableName, regionLoc, regionInfo, actions);
- new RpcRetryingCaller<MultiResponse>().callWithRetries(callable, conf,
this.replayTimeout);
+ factory.<MultiResponse> newCaller().callWithRetries(callable,
this.replayTimeout);
} catch (IOException ie) {
if (skipErrors) {
LOG.warn(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS
Modified:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
URL:
http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java?rev=1509015&r1=1509014&r2=1509015&view=diff
==============================================================================
---
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
(original)
+++
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionServerBulkLoad.java
Wed Jul 31 20:16:43 2013
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RpcRetryingCaller;
+import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.io.compress.Compression;
@@ -157,8 +158,9 @@ public class TestHRegionServerBulkLoad {
return null;
}
};
- RpcRetryingCaller<Void> caller = new RpcRetryingCaller<Void>();
- caller.callWithRetries(callable, UTIL.getConfiguration());
+ RpcRetryingCallerFactory factory = new RpcRetryingCallerFactory(conf);
+ RpcRetryingCaller<Void> caller = factory.<Void> newCaller();
+ caller.callWithRetries(callable);
// Periodically do compaction to reduce the number of open file handles.
if (numBulkLoads.get() % 10 == 0) {
@@ -178,7 +180,7 @@ public class TestHRegionServerBulkLoad {
return null;
}
};
- caller.callWithRetries(callable, UTIL.getConfiguration());
+ caller.callWithRetries(callable);
}
}
}