HBASE-13204 Procedure v2 - client create/delete table sync
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8725526d Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8725526d Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8725526d Branch: refs/heads/hbase-12439 Commit: 8725526db1192407c1b45aab2dafc9809dbb0b32 Parents: a7f9b6d Author: Matteo Bertozzi <[email protected]> Authored: Thu Apr 9 21:01:20 2015 +0100 Committer: Matteo Bertozzi <[email protected]> Committed: Thu Apr 9 21:01:50 2015 +0100 ---------------------------------------------------------------------- .../hbase/client/ConnectionImplementation.java | 6 + .../apache/hadoop/hbase/client/HBaseAdmin.java | 608 ++++- .../hbase/client/TestProcedureFuture.java | 186 ++ .../hbase/protobuf/generated/MasterProtos.java | 2576 +++++++++++++++--- hbase-protocol/src/main/protobuf/Master.proto | 24 + .../org/apache/hadoop/hbase/master/HMaster.java | 12 +- .../hadoop/hbase/master/MasterRpcServices.java | 51 +- .../hadoop/hbase/master/MasterServices.java | 4 +- .../hadoop/hbase/master/TestCatalogJanitor.java | 7 +- 9 files changed, 2920 insertions(+), 554 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/8725526d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java index 8442a77..bc2d51a 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java @@ -1598,6 +1598,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable { } @Override + public MasterProtos.GetProcedureResultResponse getProcedureResult(RpcController controller, + MasterProtos.GetProcedureResultRequest request) throws ServiceException { + return stub.getProcedureResult(controller, request); + } + + @Override public MasterProtos.IsMasterRunningResponse isMasterRunning( RpcController controller, MasterProtos.IsMasterRunningRequest request) throws ServiceException { http://git-wip-us.apache.org/repos/asf/hbase/blob/8725526d/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java index 21a9139..1697c03 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java @@ -31,6 +31,10 @@ import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.regex.Pattern; import org.apache.commons.logging.Log; @@ -62,6 +66,7 @@ import org.apache.hadoop.hbase.ZooKeeperConnectionException; import org.apache.hadoop.hbase.classification.InterfaceAudience; import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.exceptions.TimeoutIOException; import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.MasterCoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.RegionServerCoprocessorRpcChannel; @@ -89,10 +94,12 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AddColumnRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.AssignRegionRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateNamespaceRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.CreateTableResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteColumnRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteNamespaceRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteSnapshotRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DeleteTableResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DisableTableRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.DispatchMergingRegionsRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.EnableTableRequest; @@ -101,6 +108,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.ExecProcedureResp import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusRequest; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetSchemaAlterStatusResponse; import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetTableDescriptorsRequest; @@ -142,6 +151,7 @@ import org.apache.hadoop.hbase.snapshot.UnknownSnapshotException; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ForeignExceptionUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; @@ -186,6 +196,7 @@ public class HBaseAdmin implements Admin { // numRetries is for 'normal' stuff... Multiply by this factor when // want to wait a long time. private final int retryLongerMultiplier; + private final int syncWaitTimeout; private boolean aborted; private boolean cleanupConnectionOnClose = false; // close the connection in close() private boolean closed = false; @@ -242,6 +253,8 @@ public class HBaseAdmin implements Admin { "hbase.client.retries.longer.multiplier", 10); this.operationTimeout = this.conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT); + this.syncWaitTimeout = this.conf.getInt( + "hbase.client.sync.wait.timeout.msec", 10 * 60000); // 10min this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(this.conf); } @@ -541,92 +554,23 @@ public class HBaseAdmin implements Admin { */ @Override public void createTable(final HTableDescriptor desc, byte [][] splitKeys) - throws IOException { + throws IOException { + Future<Void> future = createTableAsyncV2(desc, splitKeys); try { - createTableAsync(desc, splitKeys); - } catch (SocketTimeoutException ste) { - LOG.warn("Creating " + desc.getTableName() + " took too long", ste); - } - int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication(); - int prevRegCount = 0; - boolean tableWasEnabled = false; - for (int tries = 0; tries < this.numRetries * this.retryLongerMultiplier; - ++tries) { - if (tableWasEnabled) { - // Wait all table regions comes online - final AtomicInteger actualRegCount = new AtomicInteger(0); - MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { - @Override - public boolean visit(Result rowResult) throws IOException { - RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult); - if (list == null) { - LOG.warn("No serialized HRegionInfo in " + rowResult); - return true; - } - HRegionLocation l = list.getRegionLocation(); - if (l == null) { - return true; - } - if (!l.getRegionInfo().getTable().equals(desc.getTableName())) { - return false; - } - if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true; - HRegionLocation[] locations = list.getRegionLocations(); - for (HRegionLocation location : locations) { - if (location == null) continue; - ServerName serverName = location.getServerName(); - // Make sure that regions are assigned to server - if (serverName != null && serverName.getHostAndPort() != null) { - actualRegCount.incrementAndGet(); - } - } - return true; - } - }; - MetaTableAccessor.scanMetaForTableRegions(connection, visitor, desc.getTableName()); - if (actualRegCount.get() < numRegs) { - if (tries == this.numRetries * this.retryLongerMultiplier - 1) { - throw new RegionOfflineException("Only " + actualRegCount.get() + - " of " + numRegs + " regions are online; retries exhausted."); - } - try { // Sleep - Thread.sleep(getPauseTime(tries)); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted when opening" + - " regions; " + actualRegCount.get() + " of " + numRegs + - " regions processed so far"); - } - if (actualRegCount.get() > prevRegCount) { // Making progress - prevRegCount = actualRegCount.get(); - tries = -1; - } - } else { - return; - } + // TODO: how long should we wait? spin forever? + future.get(syncWaitTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted when waiting" + + " for table to be enabled; meta scan was done"); + } catch (TimeoutException e) { + throw new TimeoutIOException(e); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException)e.getCause(); } else { - try { - tableWasEnabled = isTableAvailable(desc.getTableName()); - } catch (TableNotFoundException tnfe) { - LOG.debug( - "Table " + desc.getTableName() + " was not enabled, sleeping, still " + numRetries - + " retries left"); - } - if (tableWasEnabled) { - // no we will scan meta to ensure all regions are online - tries = -1; - } else { - try { // Sleep - Thread.sleep(getPauseTime(tries)); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted when waiting" + - " for table to be enabled; meta scan was done"); - } - } + throw new IOException(e.getCause()); } } - throw new TableNotEnabledException( - "Retries exhausted while still waiting for table: " - + desc.getTableName() + " to be enabled"); } /** @@ -646,22 +590,42 @@ public class HBaseAdmin implements Admin { * @throws IOException */ @Override - public void createTableAsync( - final HTableDescriptor desc, final byte [][] splitKeys) - throws IOException { - if(desc.getTableName() == null) { + public void createTableAsync(final HTableDescriptor desc, final byte [][] splitKeys) + throws IOException { + createTableAsyncV2(desc, splitKeys); + } + + /** + * Creates a new table but does not block and wait for it to come online. + * You can use Future.get(long, TimeUnit) to wait on the operation to complete. + * It may throw ExecutionException if there was an error while executing the operation + * or TimeoutException in case the wait timeout was not long enough to allow the + * operation to complete. + * + * @param desc table descriptor for table + * @param splitKeys keys to check if the table has been created with all split keys + * @throws IllegalArgumentException Bad table name, if the split keys + * are repeated and if the split key has empty byte array. + * @throws IOException if a remote or network exception occurs + * @return the result of the async creation. You can use Future.get(long, TimeUnit) + * to wait on the operation to complete. + */ + // TODO: This should be called Async but it will break binary compatibility + private Future<Void> createTableAsyncV2(final HTableDescriptor desc, final byte[][] splitKeys) + throws IOException { + if (desc.getTableName() == null) { throw new IllegalArgumentException("TableName cannot be null"); } - if(splitKeys != null && splitKeys.length > 0) { + if (splitKeys != null && splitKeys.length > 0) { Arrays.sort(splitKeys, Bytes.BYTES_COMPARATOR); // Verify there are no duplicate split keys - byte [] lastKey = null; - for(byte [] splitKey : splitKeys) { + byte[] lastKey = null; + for (byte[] splitKey : splitKeys) { if (Bytes.compareTo(splitKey, HConstants.EMPTY_BYTE_ARRAY) == 0) { throw new IllegalArgumentException( "Empty split key must not be passed in the split keys."); } - if(lastKey != null && Bytes.equals(splitKey, lastKey)) { + if (lastKey != null && Bytes.equals(splitKey, lastKey)) { throw new IllegalArgumentException("All split keys must be unique, " + "found duplicate: " + Bytes.toStringBinary(splitKey) + ", " + Bytes.toStringBinary(lastKey)); @@ -670,14 +634,127 @@ public class HBaseAdmin implements Admin { } } - executeCallable(new MasterCallable<Void>(getConnection()) { + CreateTableResponse response = executeCallable( + new MasterCallable<CreateTableResponse>(getConnection()) { @Override - public Void call(int callTimeout) throws ServiceException { + public CreateTableResponse call(int callTimeout) throws ServiceException { CreateTableRequest request = RequestConverter.buildCreateTableRequest(desc, splitKeys); - master.createTable(null, request); - return null; + return master.createTable(null, request); } }); + return new CreateTableFuture(this, desc, splitKeys, response); + } + + private static class CreateTableFuture extends ProcedureFuture<Void> { + private final HTableDescriptor desc; + private final byte[][] splitKeys; + + public CreateTableFuture(final HBaseAdmin admin, final HTableDescriptor desc, + final byte[][] splitKeys, final CreateTableResponse response) { + super(admin, (response != null && response.hasProcId()) ? response.getProcId() : null); + this.splitKeys = splitKeys; + this.desc = desc; + } + + @Override + protected Void waitOperationResult(final long deadlineTs) + throws IOException, TimeoutException { + waitForTableEnabled(deadlineTs); + waitForAllRegionsOnline(deadlineTs); + return null; + } + + @Override + protected Void postOperationResult(final Void result, final long deadlineTs) + throws IOException, TimeoutException { + LOG.info("Created " + desc.getTableName()); + return result; + } + + private void waitForTableEnabled(final long deadlineTs) + throws IOException, TimeoutException { + waitForState(deadlineTs, new WaitForStateCallable() { + @Override + public boolean checkState(int tries) throws IOException { + try { + if (getAdmin().isTableAvailable(desc.getTableName())) { + return true; + } + } catch (TableNotFoundException tnfe) { + LOG.debug("Table "+ desc.getTableName() +" was not enabled, sleeping. tries="+ tries); + } + return false; + } + + @Override + public void throwInterruptedException() throws InterruptedIOException { + throw new InterruptedIOException("Interrupted when waiting for table " + + desc.getTableName() + " to be enabled"); + } + + @Override + public void throwTimeoutException(long elapsedTime) throws TimeoutException { + throw new TimeoutException("Table " + desc.getTableName() + + " not enabled after " + elapsedTime + "msec"); + } + }); + } + + private void waitForAllRegionsOnline(final long deadlineTs) + throws IOException, TimeoutException { + final AtomicInteger actualRegCount = new AtomicInteger(0); + final MetaTableAccessor.Visitor visitor = new MetaTableAccessor.Visitor() { + @Override + public boolean visit(Result rowResult) throws IOException { + RegionLocations list = MetaTableAccessor.getRegionLocations(rowResult); + if (list == null) { + LOG.warn("No serialized HRegionInfo in " + rowResult); + return true; + } + HRegionLocation l = list.getRegionLocation(); + if (l == null) { + return true; + } + if (!l.getRegionInfo().getTable().equals(desc.getTableName())) { + return false; + } + if (l.getRegionInfo().isOffline() || l.getRegionInfo().isSplit()) return true; + HRegionLocation[] locations = list.getRegionLocations(); + for (HRegionLocation location : locations) { + if (location == null) continue; + ServerName serverName = location.getServerName(); + // Make sure that regions are assigned to server + if (serverName != null && serverName.getHostAndPort() != null) { + actualRegCount.incrementAndGet(); + } + } + return true; + } + }; + + int tries = 0; + IOException serverEx = null; + int numRegs = (splitKeys == null ? 1 : splitKeys.length + 1) * desc.getRegionReplication(); + while (EnvironmentEdgeManager.currentTime() < deadlineTs) { + actualRegCount.set(0); + MetaTableAccessor.scanMetaForTableRegions( + getAdmin().getConnection(), visitor, desc.getTableName()); + if (actualRegCount.get() == numRegs) { + // all the regions are online + return; + } + + try { + Thread.sleep(getAdmin().getPauseTime(tries++)); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted when opening" + + " regions; " + actualRegCount.get() + " of " + numRegs + + " regions processed so far"); + } + } + throw new TimeoutException("Only " + actualRegCount.get() + + " of " + numRegs + " regions are online; retries exhausted."); + } } public void deleteTable(final String tableName) throws IOException { @@ -697,48 +774,93 @@ public class HBaseAdmin implements Admin { */ @Override public void deleteTable(final TableName tableName) throws IOException { - boolean tableExists = true; + Future<Void> future = deleteTableAsyncV2(tableName); + try { + future.get(syncWaitTimeout, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new InterruptedIOException("Interrupted when waiting for table to be deleted"); + } catch (TimeoutException e) { + throw new TimeoutIOException(e); + } catch (ExecutionException e) { + if (e.getCause() instanceof IOException) { + throw (IOException)e.getCause(); + } else { + throw new IOException(e.getCause()); + } + } + } - executeCallable(new MasterCallable<Void>(getConnection()) { + /** + * Deletes the table but does not block and wait for it be completely removed. + * You can use Future.get(long, TimeUnit) to wait on the operation to complete. + * It may throw ExecutionException if there was an error while executing the operation + * or TimeoutException in case the wait timeout was not long enough to allow the + * operation to complete. + * + * @param desc table descriptor for table + * @param tableName name of table to delete + * @throws IOException if a remote or network exception occurs + * @return the result of the async delete. You can use Future.get(long, TimeUnit) + * to wait on the operation to complete. + */ + // TODO: This should be called Async but it will break binary compatibility + private Future<Void> deleteTableAsyncV2(final TableName tableName) throws IOException { + DeleteTableResponse response = executeCallable( + new MasterCallable<DeleteTableResponse>(getConnection()) { @Override - public Void call(int callTimeout) throws ServiceException { + public DeleteTableResponse call(int callTimeout) throws ServiceException { DeleteTableRequest req = RequestConverter.buildDeleteTableRequest(tableName); - master.deleteTable(null,req); - return null; + return master.deleteTable(null,req); } }); + return new DeleteTableFuture(this, tableName, response); + } - int failures = 0; - for (int tries = 0; tries < (this.numRetries * this.retryLongerMultiplier); tries++) { - try { - tableExists = tableExists(tableName); - if (!tableExists) - break; - } catch (IOException ex) { - failures++; - if(failures >= numRetries - 1) { // no more tries left - if (ex instanceof RemoteException) { - throw ((RemoteException) ex).unwrapRemoteException(); - } else { - throw ex; - } - } - } - try { - Thread.sleep(getPauseTime(tries)); - } catch (InterruptedException e) { - throw new InterruptedIOException("Interrupted when waiting" + - " for table to be deleted"); - } + private static class DeleteTableFuture extends ProcedureFuture<Void> { + private final TableName tableName; + + public DeleteTableFuture(final HBaseAdmin admin, final TableName tableName, + final DeleteTableResponse response) { + super(admin, (response != null && response.hasProcId()) ? response.getProcId() : null); + this.tableName = tableName; + } + + @Override + protected Void waitOperationResult(final long deadlineTs) + throws IOException, TimeoutException { + waitTableNotFound(deadlineTs); + return null; + } + + @Override + protected Void postOperationResult(final Void result, final long deadlineTs) + throws IOException, TimeoutException { + // Delete cached information to prevent clients from using old locations + getAdmin().getConnection().clearRegionCache(tableName); + LOG.info("Deleted " + tableName); + return result; } - if (tableExists) { - throw new IOException("Retries exhausted, it took too long to wait"+ - " for the table " + tableName + " to be deleted."); + private void waitTableNotFound(final long deadlineTs) + throws IOException, TimeoutException { + waitForState(deadlineTs, new WaitForStateCallable() { + @Override + public boolean checkState(int tries) throws IOException { + return !getAdmin().tableExists(tableName); + } + + @Override + public void throwInterruptedException() throws InterruptedIOException { + throw new InterruptedIOException("Interrupted when waiting for table to be deleted"); + } + + @Override + public void throwTimeoutException(long elapsedTime) throws TimeoutException { + throw new TimeoutException("Table " + tableName + " not yet deleted after " + + elapsedTime + "msec"); + } + }); } - // Delete cached information to prevent clients from using old locations - this.connection.clearRegionCache(tableName); - LOG.info("Deleted " + tableName); } /** @@ -3834,4 +3956,236 @@ public class HBaseAdmin implements Admin { } }); } + + /** + * Future that waits on a procedure result. + * Returned by the async version of the Admin calls, + * and used internally by the sync calls to wait on the result of the procedure. + */ + @InterfaceAudience.Private + @InterfaceStability.Evolving + protected static class ProcedureFuture<V> implements Future<V> { + private ExecutionException exception = null; + private boolean procResultFound = false; + private boolean done = false; + private V result = null; + + private final HBaseAdmin admin; + private final Long procId; + + public ProcedureFuture(final HBaseAdmin admin, final Long procId) { + this.admin = admin; + this.procId = procId; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isCancelled() { + // TODO: Abort not implemented yet + return false; + } + + @Override + public V get() throws InterruptedException, ExecutionException { + // TODO: should we ever spin forever? + throw new UnsupportedOperationException(); + } + + @Override + public V get(long timeout, TimeUnit unit) + throws InterruptedException, ExecutionException, TimeoutException { + if (!done) { + long deadlineTs = EnvironmentEdgeManager.currentTime() + unit.toMillis(timeout); + try { + try { + // if the master support procedures, try to wait the result + if (procId != null) { + result = waitProcedureResult(procId, deadlineTs); + } + // if we don't have a proc result, try the compatibility wait + if (!procResultFound) { + result = waitOperationResult(deadlineTs); + } + result = postOperationResult(result, deadlineTs); + done = true; + } catch (IOException e) { + result = postOpeartionFailure(e, deadlineTs); + done = true; + } + } catch (IOException e) { + exception = new ExecutionException(e); + done = true; + } + } + if (exception != null) { + throw exception; + } + return result; + } + + @Override + public boolean isDone() { + return done; + } + + protected HBaseAdmin getAdmin() { + return admin; + } + + private V waitProcedureResult(long procId, long deadlineTs) + throws IOException, TimeoutException, InterruptedException { + GetProcedureResultRequest request = GetProcedureResultRequest.newBuilder() + .setProcId(procId) + .build(); + + int tries = 0; + IOException serviceEx = null; + while (EnvironmentEdgeManager.currentTime() < deadlineTs) { + GetProcedureResultResponse response = null; + try { + // Try to fetch the result + response = getProcedureResult(request); + } catch (IOException e) { + serviceEx = unwrapException(e); + + // the master may be down + LOG.warn("failed to get the procedure result procId=" + procId, serviceEx); + + // Not much to do, if we have a DoNotRetryIOException + if (serviceEx instanceof DoNotRetryIOException) { + // TODO: looks like there is no way to unwrap this exception and get the proper + // UnsupportedOperationException aside from looking at the message. + // anyway, if we fail here we just failover to the compatibility side + // and that is always a valid solution. + LOG.warn("Proc-v2 is unsupported on this master: " + serviceEx.getMessage(), serviceEx); + procResultFound = false; + return null; + } + } + + // If the procedure is no longer running, we should have a result + if (response != null && response.getState() != GetProcedureResultResponse.State.RUNNING) { + procResultFound = response.getState() != GetProcedureResultResponse.State.NOT_FOUND; + return convertResult(response); + } + + try { + Thread.sleep(getAdmin().getPauseTime(tries++)); + } catch (InterruptedException e) { + throw new InterruptedException( + "Interrupted while waiting for the result of proc " + procId); + } + } + if (serviceEx != null) { + throw serviceEx; + } else { + throw new TimeoutException("The procedure " + procId + " is still running"); + } + } + + private static IOException unwrapException(IOException e) { + if (e instanceof RemoteException) { + return ((RemoteException)e).unwrapRemoteException(); + } + return e; + } + + protected GetProcedureResultResponse getProcedureResult(final GetProcedureResultRequest request) + throws IOException { + return admin.executeCallable(new MasterCallable<GetProcedureResultResponse>( + admin.getConnection()) { + @Override + public GetProcedureResultResponse call(int callTimeout) throws ServiceException { + return master.getProcedureResult(null, request); + } + }); + } + + /** + * Convert the procedure result response to a specified type. + * @param response the procedure result object to parse + * @return the result data of the procedure. + */ + protected V convertResult(final GetProcedureResultResponse response) throws IOException { + if (response.hasException()) { + throw ForeignExceptionUtil.toIOException(response.getException()); + } + return null; + } + + /** + * Fallback implementation in case the procedure is not supported by the server. + * It should try to wait until the operation is completed. + * @param deadlineTs the timestamp after which this method should throw a TimeoutException + * @return the result data of the operation + */ + protected V waitOperationResult(final long deadlineTs) + throws IOException, TimeoutException { + return null; + } + + /** + * Called after the operation is completed and the result fetched. + * this allows to perform extra steps after the procedure is completed. + * it allows to apply transformations to the result that will be returned by get(). + * @param result the result of the procedure + * @param deadlineTs the timestamp after which this method should throw a TimeoutException + * @return the result of the procedure, which may be the same as the passed one + */ + protected V postOperationResult(final V result, final long deadlineTs) + throws IOException, TimeoutException { + return result; + } + + /** + * Called after the operation is terminated with a failure. + * this allows to perform extra steps after the procedure is terminated. + * it allows to apply transformations to the result that will be returned by get(). + * The default implementation will rethrow the exception + * @param result the result of the procedure + * @param deadlineTs the timestamp after which this method should throw a TimeoutException + * @return the result of the procedure, which may be the same as the passed one + */ + protected V postOpeartionFailure(final IOException exception, final long deadlineTs) + throws IOException, TimeoutException { + throw exception; + } + + protected interface WaitForStateCallable { + boolean checkState(int tries) throws IOException; + void throwInterruptedException() throws InterruptedIOException; + void throwTimeoutException(long elapsed) throws TimeoutException; + } + + protected void waitForState(final long deadlineTs, final WaitForStateCallable callable) + throws IOException, TimeoutException { + int tries = 0; + IOException serverEx = null; + long startTime = EnvironmentEdgeManager.currentTime(); + while (EnvironmentEdgeManager.currentTime() < deadlineTs) { + serverEx = null; + try { + if (callable.checkState(tries)) { + return; + } + } catch (IOException e) { + serverEx = e; + } + try { + Thread.sleep(getAdmin().getPauseTime(tries++)); + } catch (InterruptedException e) { + callable.throwInterruptedException(); + } + } + if (serverEx != null) { + throw unwrapException(serverEx); + } else { + callable.throwTimeoutException(EnvironmentEdgeManager.currentTime() - startTime); + } + } + } } http://git-wip-us.apache.org/repos/asf/hbase/blob/8725526d/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java ---------------------------------------------------------------------- diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java new file mode 100644 index 0000000..da3ffe9 --- /dev/null +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestProcedureFuture.java @@ -0,0 +1,186 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultRequest; +import org.apache.hadoop.hbase.protobuf.generated.MasterProtos.GetProcedureResultResponse; + +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.mockito.Mockito; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +@Category({ClientTests.class, SmallTests.class}) +public class TestProcedureFuture { + private static class TestFuture extends HBaseAdmin.ProcedureFuture<Void> { + private boolean postOperationResultCalled = false; + private boolean waitOperationResultCalled = false; + private boolean getProcedureResultCalled = false; + private boolean convertResultCalled = false; + + public TestFuture(final HBaseAdmin admin, final Long procId) { + super(admin, procId); + } + + public boolean wasPostOperationResultCalled() { + return postOperationResultCalled; + } + + public boolean wasWaitOperationResultCalled() { + return waitOperationResultCalled; + } + + public boolean wasGetProcedureResultCalled() { + return getProcedureResultCalled; + } + + public boolean wasConvertResultCalled() { + return convertResultCalled; + } + + @Override + protected GetProcedureResultResponse getProcedureResult( + final GetProcedureResultRequest request) throws IOException { + getProcedureResultCalled = true; + return GetProcedureResultResponse.newBuilder() + .setState(GetProcedureResultResponse.State.FINISHED) + .build(); + } + + @Override + protected Void convertResult(final GetProcedureResultResponse response) throws IOException { + convertResultCalled = true; + return null; + } + + @Override + protected Void waitOperationResult(final long deadlineTs) + throws IOException, TimeoutException { + waitOperationResultCalled = true; + return null; + } + + @Override + protected Void postOperationResult(final Void result, final long deadlineTs) + throws IOException, TimeoutException { + postOperationResultCalled = true; + return result; + } + } + + /** + * When a master return a result with procId, + * we are skipping the waitOperationResult() call, + * since we are getting the procedure result. + */ + @Test(timeout=60000) + public void testWithProcId() throws Exception { + HBaseAdmin admin = Mockito.mock(HBaseAdmin.class); + TestFuture f = new TestFuture(admin, 100L); + f.get(1, TimeUnit.MINUTES); + + assertTrue("expected getProcedureResult() to be called", f.wasGetProcedureResultCalled()); + assertTrue("expected convertResult() to be called", f.wasConvertResultCalled()); + assertFalse("unexpected waitOperationResult() called", f.wasWaitOperationResultCalled()); + assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled()); + } + + /** + * Verify that the spin loop for the procedure running works. + */ + @Test(timeout=60000) + public void testWithProcIdAndSpinning() throws Exception { + final AtomicInteger spinCount = new AtomicInteger(0); + HBaseAdmin admin = Mockito.mock(HBaseAdmin.class); + TestFuture f = new TestFuture(admin, 100L) { + @Override + protected GetProcedureResultResponse getProcedureResult( + final GetProcedureResultRequest request) throws IOException { + boolean done = spinCount.incrementAndGet() >= 10; + return GetProcedureResultResponse.newBuilder() + .setState(done ? GetProcedureResultResponse.State.FINISHED : + GetProcedureResultResponse.State.RUNNING) + .build(); + } + }; + f.get(1, TimeUnit.MINUTES); + + assertEquals(10, spinCount.get()); + assertTrue("expected convertResult() to be called", f.wasConvertResultCalled()); + assertFalse("unexpected waitOperationResult() called", f.wasWaitOperationResultCalled()); + assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled()); + } + + /** + * When a master return a result without procId, + * we are skipping the getProcedureResult() call. + */ + @Test(timeout=60000) + public void testWithoutProcId() throws Exception { + HBaseAdmin admin = Mockito.mock(HBaseAdmin.class); + TestFuture f = new TestFuture(admin, null); + f.get(1, TimeUnit.MINUTES); + + assertFalse("unexpected getProcedureResult() called", f.wasGetProcedureResultCalled()); + assertFalse("unexpected convertResult() called", f.wasConvertResultCalled()); + assertTrue("expected waitOperationResult() to be called", f.wasWaitOperationResultCalled()); + assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled()); + } + + /** + * When a new client with procedure support tries to ask an old-master without proc-support + * the procedure result we get a DoNotRetryIOException (which is an UnsupportedOperationException) + * The future should trap that and fallback to the waitOperationResult(). + * + * This happens when the operation calls happens on a "new master" but while we are waiting + * the operation to be completed, we failover on an "old master". + */ + @Test(timeout=60000) + public void testOnServerWithNoProcedureSupport() throws Exception { + HBaseAdmin admin = Mockito.mock(HBaseAdmin.class); + TestFuture f = new TestFuture(admin, 100L) { + @Override + protected GetProcedureResultResponse getProcedureResult( + final GetProcedureResultRequest request) throws IOException { + super.getProcedureResult(request); + throw new DoNotRetryIOException(new UnsupportedOperationException("getProcedureResult")); + } + }; + f.get(1, TimeUnit.MINUTES); + + assertTrue("expected getProcedureResult() to be called", f.wasGetProcedureResultCalled()); + assertFalse("unexpected convertResult() called", f.wasConvertResultCalled()); + assertTrue("expected waitOperationResult() to be called", f.wasWaitOperationResultCalled()); + assertTrue("expected postOperationResult() to be called", f.wasPostOperationResultCalled()); + } +} \ No newline at end of file
