frostruan commented on code in PR #4246:
URL: https://github.com/apache/hbase/pull/4246#discussion_r844018501
##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java:
##########
@@ -927,33 +930,59 @@ public void run(PRESP resp) {
@Override
public CompletableFuture<Void> flush(TableName tableName, byte[]
columnFamily) {
+ // This is for keeping compatibility with old implementation.
+ // If the server version is lower than the client version, it's possible
that the
+ // flushTable method is not present in the server side, if so, we need to
fall back
+ // to the old implementation.
+ FlushTableRequest request = RequestConverter
+ .buildFlushTableRequest(tableName, columnFamily, ng.getNonceGroup(),
ng.newNonce());
+ CompletableFuture<Void> procFuture =
+ this.<FlushTableRequest, FlushTableResponse>procedureCall(tableName,
request,
+ (s, c, req, done) -> s.flushTable(c, req, done), (resp) ->
resp.getProcId(),
+ new FlushTableProcedureBiConsumer(tableName));
+ // here we use another new CompletableFuture because the
+ // procFuture is not fully controlled by ourselves.
CompletableFuture<Void> future = new CompletableFuture<>();
- addListener(tableExists(tableName), (exists, err) -> {
- if (err != null) {
- future.completeExceptionally(err);
- } else if (!exists) {
- future.completeExceptionally(new TableNotFoundException(tableName));
- } else {
- addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
- if (err2 != null) {
- future.completeExceptionally(err2);
- } else if (!tableEnabled) {
- future.completeExceptionally(new
TableNotEnabledException(tableName));
- } else {
- Map<String, String> props = new HashMap<>();
- if (columnFamily != null) {
- props.put(HConstants.FAMILY_KEY_STR,
Bytes.toString(columnFamily));
- }
- addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE,
tableName.getNameAsString(),
- props), (ret, err3) -> {
- if (err3 != null) {
- future.completeExceptionally(err3);
+ addListener(procFuture, (ret, error) -> {
+ if (error != null) {
+ if (error instanceof DoNotRetryIOException) {
+ // usually this is caused by the method is not present on the server
or
+ // the hbase hadoop version does not match the running hadoop
version.
+ // if that happens, we need fall back to the old flush
implementation.
+ LOG.info("Unrecoverable error in master side. Fallback to
FlushTableProcedure V1", error);
+ addListener(tableExists(tableName), (exists, err) -> {
Review Comment:
Ok. Thanks Duo.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]