Apache9 commented on code in PR #4246:
URL: https://github.com/apache/hbase/pull/4246#discussion_r843905776


##########
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/FlushRegionCallable.java:
##########
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.client.IsolationLevel;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.executor.EventType;
+import org.apache.hadoop.hbase.procedure2.BaseRSProcedureCallable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProcedureProtos.FlushRegionParameter;
+
[email protected]
+public class FlushRegionCallable extends BaseRSProcedureCallable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FlushRegionCallable.class);
+
+  private RegionInfo regionInfo;
+
+  private byte[] columnFamily;
+
+  @Override
+  protected void doCall() throws Exception {
+    HRegion region = rs.getRegion(regionInfo.getEncodedName());
+    if (region == null) {
+      throw new NotServingRegionException("region=" + 
regionInfo.getRegionNameAsString());
+    }
+    LOG.debug("Starting region operation on {}", region);
+    region.startRegionOperation();
+    try {
+      long readPt = region.getReadPoint(IsolationLevel.READ_COMMITTED);
+      HRegion.FlushResult res;
+      if (columnFamily == null) {
+        res = region.flush(true);
+      } else {
+        res = region.flushcache(Collections.singletonList(columnFamily),
+          false, FlushLifeCycleTracker.DUMMY);
+      }
+      if (res.getResult() == HRegion.FlushResult.Result.CANNOT_FLUSH) {
+        region.waitForFlushes();

Review Comment:
   Is it always safe to call this here?



##########
hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/AbstractRegionRemoteProcedure.java:
##########
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.procedure;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.master.RegionState;
+import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
+import org.apache.hadoop.hbase.master.assignment.RegionStates;
+import org.apache.hadoop.hbase.master.assignment.ServerState;
+import org.apache.hadoop.hbase.procedure2.FailedRemoteDispatchException;
+import org.apache.hadoop.hbase.procedure2.Procedure;
+import org.apache.hadoop.hbase.procedure2.ProcedureEvent;
+import org.apache.hadoop.hbase.procedure2.ProcedureSuspendedException;
+import org.apache.hadoop.hbase.procedure2.ProcedureUtil;
+import org.apache.hadoop.hbase.procedure2.ProcedureYieldException;
+import 
org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher.RemoteProcedure;
+import org.apache.hadoop.hbase.procedure2.RemoteProcedureException;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ProcedureProtos;
+
+/**
+ *  The base class for the remote procedures for normal operations, like flush 
or snapshot.
+ *  Normal operations do not change the region state. This is the difference 
between
+ *  {@link 
org.apache.hadoop.hbase.master.assignment.RegionRemoteProcedureBase} and
+ *  {@link 
org.apache.hadoop.hbase.master.procedure.AbstractRegionRemoteProcedure}.
+ *  It requires that the state of the region must be OPEN. If region is in 
transition state,
+ *  the procedure will suspend and retry later.
+ */
[email protected]
+public abstract class AbstractRegionRemoteProcedure extends 
Procedure<MasterProcedureEnv>

Review Comment:
   Better give it another name? Like IdempotentRegionRemoteProcedureBase?



##########
hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java:
##########
@@ -927,33 +930,59 @@ public void run(PRESP resp) {
 
   @Override
   public CompletableFuture<Void> flush(TableName tableName, byte[] 
columnFamily) {
+    // This is for keeping compatibility with old implementation.
+    // If the server version is lower than the client version, it's possible 
that the
+    // flushTable method is not present in the server side, if so, we need to 
fall back
+    // to the old implementation.
+    FlushTableRequest request = RequestConverter
+      .buildFlushTableRequest(tableName, columnFamily, ng.getNonceGroup(), 
ng.newNonce());
+    CompletableFuture<Void> procFuture =
+      this.<FlushTableRequest, FlushTableResponse>procedureCall(tableName, 
request,
+        (s, c, req, done) -> s.flushTable(c, req, done), (resp) -> 
resp.getProcId(),
+        new FlushTableProcedureBiConsumer(tableName));
+    // here we use another new CompletableFuture because the
+    // procFuture is not fully controlled by ourselves.
     CompletableFuture<Void> future = new CompletableFuture<>();
-    addListener(tableExists(tableName), (exists, err) -> {
-      if (err != null) {
-        future.completeExceptionally(err);
-      } else if (!exists) {
-        future.completeExceptionally(new TableNotFoundException(tableName));
-      } else {
-        addListener(isTableEnabled(tableName), (tableEnabled, err2) -> {
-          if (err2 != null) {
-            future.completeExceptionally(err2);
-          } else if (!tableEnabled) {
-            future.completeExceptionally(new 
TableNotEnabledException(tableName));
-          } else {
-            Map<String, String> props = new HashMap<>();
-            if (columnFamily != null) {
-              props.put(HConstants.FAMILY_KEY_STR, 
Bytes.toString(columnFamily));
-            }
-            addListener(execProcedure(FLUSH_TABLE_PROCEDURE_SIGNATURE, 
tableName.getNameAsString(),
-              props), (ret, err3) -> {
-                if (err3 != null) {
-                  future.completeExceptionally(err3);
+    addListener(procFuture, (ret, error) -> {
+      if (error != null) {
+        if (error instanceof DoNotRetryIOException) {
+          // usually this is caused by the method is not present on the server 
or
+          // the hbase hadoop version does not match the running hadoop 
version.
+          // if that happens, we need fall back to the old flush 
implementation.
+          LOG.info("Unrecoverable error in master side. Fallback to 
FlushTableProcedure V1", error);
+          addListener(tableExists(tableName), (exists, err) -> {

Review Comment:
   Better abstract this to a seprated method and call it legacyFlush(or 
something else, I'm not good at naming in English...)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to