wecharyu commented on code in PR #5851:
URL: https://github.com/apache/hive/pull/5851#discussion_r2646023198


##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AbstractAsyncOperationHandler.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;

Review Comment:
   is it better to make a deeper package like 
`org.apache.hadoop.hive.metastore.handler`? and we can add more handler for 
other operation in the future.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AbstractAsyncOperationHandler.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.metastore.api.DropTableRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+
+public abstract class AbstractAsyncOperationHandler <T extends TBase, A> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractAsyncOperationHandler.class);
+  private static final Map<String, AbstractAsyncOperationHandler> 
OPID_TO_HANDLER = new ConcurrentHashMap<>();
+  private static final ScheduledExecutorService OPID_CLEANER = 
Executors.newScheduledThreadPool(1, r -> {
+    Thread thread = new Thread(r);
+    thread.setDaemon(true);
+    thread.setName("AsyncOperationsHandler-cleaner");
+    return thread;
+  });
+
+  private A result;
+  private boolean async;
+  private Future<A> future;
+  private ExecutorService executor;
+  private final AtomicBoolean aborted = new AtomicBoolean();
+
+  protected T request;
+  protected IHMSHandler handler;
+  protected final String id;
+  private long timeout;
+
+  private AbstractAsyncOperationHandler(String id) {
+    this.id = id;
+  }
+
+  AbstractAsyncOperationHandler(IHMSHandler handler, boolean async, T request) 
{
+    this.id = UUID.randomUUID().toString();
+    this.handler = handler;
+    this.request = request;
+    this.async = async;
+    this.timeout = MetastoreConf.getBoolVar(handler.getConf(), HIVE_IN_TEST) ? 
10 : 5000;
+    OPID_TO_HANDLER.put(id, this);
+    if (async) {
+      this.executor = Executors.newFixedThreadPool(1, r -> {
+        Thread thread = new Thread(r);
+        thread.setDaemon(true);
+        thread.setName("TableOperationsHandler " + id);
+        return thread;
+      });
+    } else {
+      this.executor = MoreExecutors.newDirectExecutorService();
+    }
+    this.future =
+        executor.submit(() -> {
+          try {
+            return execute();
+          } finally {
+            destroy();
+            OPID_CLEANER.schedule(() -> OPID_TO_HANDLER.remove(id), 1, 
TimeUnit.HOURS);
+          }
+        });
+    this.executor.shutdown();
+  }
+
+  private static <T extends TBase, A> AbstractAsyncOperationHandler<T, A>
+      ofCache(String opId, boolean shouldCancel) throws TException {
+    AbstractAsyncOperationHandler<T, A> asyncOp = null;
+    if (opId != null) {
+      asyncOp = OPID_TO_HANDLER.get(opId);
+      if (asyncOp == null && !shouldCancel) {
+        throw new MetaException("Couldn't find the async operation handler: " 
+ opId);
+      }
+      if (shouldCancel) {
+        if (asyncOp != null) {
+          asyncOp.cancelOperation();
+        } else {
+          asyncOp = new AbstractAsyncOperationHandler<>(opId) {
+            @Override
+            public OperationStatus getOperationStatus() throws TException {
+              OperationStatus resp = new OperationStatus(opId);
+              resp.setMessage("Operation has been canceled");
+              resp.setFinished(true);
+              return resp;
+            }
+            @Override
+            protected A execute() throws TException, IOException {
+              throw new UnsupportedOperationException();
+            }
+            @Override
+            public String getLogMessagePrefix() {
+              throw new UnsupportedOperationException();
+            }
+            @Override
+            public String getOperationProgress() {
+              throw new UnsupportedOperationException();
+            }
+          };
+        }
+      }
+    }
+    return asyncOp;
+  }
+
+  public static <T extends TBase, A> AbstractAsyncOperationHandler<T, A> 
offer(IHMSHandler handler, T req)
+      throws TException {
+    if (req instanceof DropTableRequest request) {
+      AbstractAsyncOperationHandler<T, A> asycOp = ofCache(request.getId(), 
request.isCancel());
+      if (asycOp == null) {
+        asycOp= (AbstractAsyncOperationHandler<T, A>)
+            new AsyncDropTableHandler(handler, request.isAsyncDrop(), request);
+      }
+      return asycOp;
+    }
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  public OperationStatus getOperationStatus() throws TException {
+    String logMsgPrefix = getLogMessagePrefix();
+    if (future == null) {
+      throw new IllegalStateException(logMsgPrefix + " hasn't started yet");
+    }
+    try {
+      result = async ? future.get(timeout, TimeUnit.MILLISECONDS) : 
future.get();
+    } catch (TimeoutException e) {
+      // No Op, return to the caller since long polling timeout has expired
+      LOG.trace("{} Long polling timed out", logMsgPrefix);
+    } catch (CancellationException e) {
+      // The background operation thread was cancelled
+      LOG.trace("{} The background operation was cancelled", logMsgPrefix);
+    } catch (ExecutionException | InterruptedException e) {
+      // No op, we will deal with this exception later
+      LOG.error("{} Failed", logMsgPrefix, e);
+      if (e.getCause() instanceof Exception ex && !aborted.get()) {
+        throw 
handleException(ex).throwIfInstance(TException.class).defaultMetaException();
+      }
+      String errorMsg = e.getCause() != null ? e.getCause().getMessage() : 
e.getMessage();
+      throw new MetaException(logMsgPrefix + " failed with " + errorMsg);
+    }
+
+    OperationStatus resp = new OperationStatus(id);
+    if (future.isDone()) {
+      resp.setFinished(true);
+      resp.setMessage(logMsgPrefix + (future.isCancelled() ? " Canceled" : " 
Done"));
+    } else {
+      resp.setMessage(logMsgPrefix + " In-progress, state - " + 
getOperationProgress());
+    }
+    return resp;
+  }
+
+  static class OperationStatus {
+    private final String id;
+    private String message;
+    private boolean finished;
+    OperationStatus(String id) {
+      this.id = id;
+    }
+
+    public String getMessage() {
+      return message;
+    }
+
+    public void setMessage(String message) {
+      this.message = message;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public boolean isFinished() {
+      return finished;
+    }
+
+    public void setFinished(boolean finished) {
+      this.finished = finished;
+    }
+  }
+
+  public void cancelOperation() {
+    if (!future.isDone()) {
+      LOG.warn("Operation: {} is still running, but a close signal is 
triggered", id);

Review Comment:
   nit: can also use `getLogMessagePrefix()`.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AbstractAsyncOperationHandler.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.metastore.api.DropTableRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+
+public abstract class AbstractAsyncOperationHandler <T extends TBase, A> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractAsyncOperationHandler.class);
+  private static final Map<String, AbstractAsyncOperationHandler> 
OPID_TO_HANDLER = new ConcurrentHashMap<>();
+  private static final ScheduledExecutorService OPID_CLEANER = 
Executors.newScheduledThreadPool(1, r -> {
+    Thread thread = new Thread(r);
+    thread.setDaemon(true);
+    thread.setName("AsyncOperationsHandler-cleaner");
+    return thread;
+  });
+
+  private A result;
+  private boolean async;
+  private Future<A> future;
+  private ExecutorService executor;
+  private final AtomicBoolean aborted = new AtomicBoolean();
+
+  protected T request;
+  protected IHMSHandler handler;
+  protected final String id;
+  private long timeout;
+
+  private AbstractAsyncOperationHandler(String id) {
+    this.id = id;
+  }
+
+  AbstractAsyncOperationHandler(IHMSHandler handler, boolean async, T request) 
{
+    this.id = UUID.randomUUID().toString();
+    this.handler = handler;
+    this.request = request;
+    this.async = async;
+    this.timeout = MetastoreConf.getBoolVar(handler.getConf(), HIVE_IN_TEST) ? 
10 : 5000;
+    OPID_TO_HANDLER.put(id, this);
+    if (async) {
+      this.executor = Executors.newFixedThreadPool(1, r -> {
+        Thread thread = new Thread(r);
+        thread.setDaemon(true);
+        thread.setName("TableOperationsHandler " + id);
+        return thread;
+      });
+    } else {
+      this.executor = MoreExecutors.newDirectExecutorService();
+    }
+    this.future =
+        executor.submit(() -> {
+          try {
+            return execute();
+          } finally {
+            destroy();
+            OPID_CLEANER.schedule(() -> OPID_TO_HANDLER.remove(id), 1, 
TimeUnit.HOURS);
+          }
+        });
+    this.executor.shutdown();
+  }
+
+  private static <T extends TBase, A> AbstractAsyncOperationHandler<T, A>
+      ofCache(String opId, boolean shouldCancel) throws TException {
+    AbstractAsyncOperationHandler<T, A> asyncOp = null;
+    if (opId != null) {
+      asyncOp = OPID_TO_HANDLER.get(opId);
+      if (asyncOp == null && !shouldCancel) {
+        throw new MetaException("Couldn't find the async operation handler: " 
+ opId);
+      }
+      if (shouldCancel) {
+        if (asyncOp != null) {
+          asyncOp.cancelOperation();
+        } else {
+          asyncOp = new AbstractAsyncOperationHandler<>(opId) {
+            @Override
+            public OperationStatus getOperationStatus() throws TException {
+              OperationStatus resp = new OperationStatus(opId);
+              resp.setMessage("Operation has been canceled");
+              resp.setFinished(true);
+              return resp;
+            }
+            @Override
+            protected A execute() throws TException, IOException {
+              throw new UnsupportedOperationException();
+            }
+            @Override
+            public String getLogMessagePrefix() {
+              throw new UnsupportedOperationException();
+            }
+            @Override
+            public String getOperationProgress() {
+              throw new UnsupportedOperationException();
+            }
+          };
+        }
+      }
+    }
+    return asyncOp;
+  }
+
+  public static <T extends TBase, A> AbstractAsyncOperationHandler<T, A> 
offer(IHMSHandler handler, T req)
+      throws TException {
+    if (req instanceof DropTableRequest request) {
+      AbstractAsyncOperationHandler<T, A> asycOp = ofCache(request.getId(), 
request.isCancel());
+      if (asycOp == null) {
+        asycOp= (AbstractAsyncOperationHandler<T, A>)
+            new AsyncDropTableHandler(handler, request.isAsyncDrop(), request);
+      }
+      return asycOp;
+    }
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  public OperationStatus getOperationStatus() throws TException {
+    String logMsgPrefix = getLogMessagePrefix();
+    if (future == null) {
+      throw new IllegalStateException(logMsgPrefix + " hasn't started yet");
+    }
+    try {
+      result = async ? future.get(timeout, TimeUnit.MILLISECONDS) : 
future.get();
+    } catch (TimeoutException e) {
+      // No Op, return to the caller since long polling timeout has expired
+      LOG.trace("{} Long polling timed out", logMsgPrefix);
+    } catch (CancellationException e) {
+      // The background operation thread was cancelled
+      LOG.trace("{} The background operation was cancelled", logMsgPrefix);
+    } catch (ExecutionException | InterruptedException e) {
+      // No op, we will deal with this exception later
+      LOG.error("{} Failed", logMsgPrefix, e);
+      if (e.getCause() instanceof Exception ex && !aborted.get()) {
+        throw 
handleException(ex).throwIfInstance(TException.class).defaultMetaException();
+      }
+      String errorMsg = e.getCause() != null ? e.getCause().getMessage() : 
e.getMessage();
+      throw new MetaException(logMsgPrefix + " failed with " + errorMsg);
+    }
+
+    OperationStatus resp = new OperationStatus(id);
+    if (future.isDone()) {
+      resp.setFinished(true);
+      resp.setMessage(logMsgPrefix + (future.isCancelled() ? " Canceled" : " 
Done"));
+    } else {
+      resp.setMessage(logMsgPrefix + " In-progress, state - " + 
getOperationProgress());
+    }
+    return resp;
+  }
+
+  static class OperationStatus {
+    private final String id;
+    private String message;
+    private boolean finished;
+    OperationStatus(String id) {
+      this.id = id;
+    }
+
+    public String getMessage() {
+      return message;
+    }
+
+    public void setMessage(String message) {
+      this.message = message;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public boolean isFinished() {
+      return finished;
+    }
+
+    public void setFinished(boolean finished) {
+      this.finished = finished;
+    }
+  }
+
+  public void cancelOperation() {
+    if (!future.isDone()) {
+      LOG.warn("Operation: {} is still running, but a close signal is 
triggered", id);
+      future.cancel(true);
+      aborted.set(true);
+    }
+    executor.shutdown();
+  }
+
+  /**
+   * Retrieve the result after this operation is done,
+   * an IllegalStateException would raise if the operation has not completed.
+   * @return the operation result
+   * @throws TException exception while checking the status of the operation
+   */
+  public final A getResult() throws TException {
+    OperationStatus resp = getOperationStatus();
+    if (!resp.isFinished()) {
+      throw new IllegalStateException("Result is un-available as the operation 
" + id + " is still running");
+    }
+    return result;
+  }
+
+  /**
+   *  Run this operation.
+   *  @return computed result
+   * @throws TException  if unable to run the operation
+   * @throws IOException if the request is invalid
+   */
+  protected abstract A execute() throws TException, IOException;
+
+  /**
+   * Get the prefix for logging the message on polling the operation status.
+   *
+   * @return message prefix
+   */
+  protected abstract String getLogMessagePrefix();
+
+  /**
+   * Get the message about the operation progress.
+   *
+   * @return the progress
+   */
+  protected abstract String getOperationProgress();
+
+  public void checkInterrupted() throws MetaException {
+    if (aborted.get()) {
+      String errorMessage = "FAILED: drop table " + id + " has been 
interrupted";

Review Comment:
   ditto.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AbstractAsyncOperationHandler.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.metastore.api.DropTableRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+
+public abstract class AbstractAsyncOperationHandler <T extends TBase, A> {

Review Comment:
   Can we just name it `AbstractOperationHandler` because it could be sync if 
`async` is false.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AbstractAsyncOperationHandler.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.metastore.api.DropTableRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+
+public abstract class AbstractAsyncOperationHandler <T extends TBase, A> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractAsyncOperationHandler.class);
+  private static final Map<String, AbstractAsyncOperationHandler> 
OPID_TO_HANDLER = new ConcurrentHashMap<>();
+  private static final ScheduledExecutorService OPID_CLEANER = 
Executors.newScheduledThreadPool(1, r -> {
+    Thread thread = new Thread(r);
+    thread.setDaemon(true);
+    thread.setName("AsyncOperationsHandler-cleaner");
+    return thread;
+  });
+
+  private A result;
+  private boolean async;
+  private Future<A> future;
+  private ExecutorService executor;
+  private final AtomicBoolean aborted = new AtomicBoolean();
+
+  protected T request;
+  protected IHMSHandler handler;
+  protected final String id;
+  private long timeout;
+
+  private AbstractAsyncOperationHandler(String id) {
+    this.id = id;
+  }
+
+  AbstractAsyncOperationHandler(IHMSHandler handler, boolean async, T request) 
{
+    this.id = UUID.randomUUID().toString();
+    this.handler = handler;
+    this.request = request;
+    this.async = async;
+    this.timeout = MetastoreConf.getBoolVar(handler.getConf(), HIVE_IN_TEST) ? 
10 : 5000;
+    OPID_TO_HANDLER.put(id, this);
+    if (async) {
+      this.executor = Executors.newFixedThreadPool(1, r -> {
+        Thread thread = new Thread(r);
+        thread.setDaemon(true);
+        thread.setName("TableOperationsHandler " + id);
+        return thread;
+      });
+    } else {
+      this.executor = MoreExecutors.newDirectExecutorService();
+    }
+    this.future =
+        executor.submit(() -> {
+          try {
+            return execute();
+          } finally {
+            destroy();
+            OPID_CLEANER.schedule(() -> OPID_TO_HANDLER.remove(id), 1, 
TimeUnit.HOURS);
+          }
+        });
+    this.executor.shutdown();
+  }
+
+  private static <T extends TBase, A> AbstractAsyncOperationHandler<T, A>

Review Comment:
   How about moving this and below static method into a static factory class?



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AsyncDropTableHandler.java:
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.TableName;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropTableRequest;
+import org.apache.hadoop.hive.metastore.api.GetTableRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.events.DropTableEvent;
+import org.apache.hadoop.hive.metastore.events.PreDropTableEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.thrift.TException;
+
+import static 
org.apache.hadoop.hive.metastore.HMSHandler.checkTableDataShouldBeDeleted;
+import static 
org.apache.hadoop.hive.metastore.HMSHandler.isDbReplicationTarget;
+import static org.apache.hadoop.hive.metastore.HMSHandler.isMustPurge;
+import static 
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
+import static 
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+
+public class AsyncDropTableHandler
+    extends AbstractAsyncOperationHandler<DropTableRequest, 
AsyncDropTableHandler.DropTableResult> {
+  private Table tbl;
+  private Path tblPath;
+  private TableName tableName;
+  private boolean tableDataShouldBeDeleted;
+  private AtomicReference<String> progress;
+
+  AsyncDropTableHandler(IHMSHandler handler, boolean async, DropTableRequest 
request) {
+    super(handler, async, request);
+  }
+
+  public DropTableResult dropTable() throws TException {

Review Comment:
   could it be a `private` method?



##########
standalone-metastore/metastore-client/src/main/java/org/apache/hadoop/hive/metastore/client/ThriftHiveMetaStoreClient.java:
##########
@@ -1679,7 +1678,22 @@ public void dropTable(String catName, String dbname, 
String name, boolean delete
     dropTableReq.setCatalogName(catName);
     dropTableReq.setDropPartitions(true);
     dropTableReq.setEnvContext(envContext);
-    client.drop_table_req(dropTableReq);
+    dropTableReq.setAsyncDrop(!isLocalMetaStore());
+    AsyncOperationResp resp = client.drop_table_req(dropTableReq);
+    dropTableReq.setId(resp.getId());
+    try {
+      while (!resp.isFinished() && !Thread.currentThread().isInterrupted()) {
+        resp = client.drop_table_req(dropTableReq);
+        if (resp.getMessage() != null) {
+          LOG.info(resp.getMessage());
+        }
+      }
+    } finally {
+      if (!resp.isFinished()) {
+        dropTableReq.setCancel(true);
+        client.drop_table_req(dropTableReq);

Review Comment:
   This is a similar issue, it may request to a new instance and could not 
cancel the operation in original instance.



##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/AbstractAsyncOperationHandler.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.metastore.api.DropTableRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static 
org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
+import static 
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+
+public abstract class AbstractAsyncOperationHandler <T extends TBase, A> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AbstractAsyncOperationHandler.class);
+  private static final Map<String, AbstractAsyncOperationHandler> 
OPID_TO_HANDLER = new ConcurrentHashMap<>();
+  private static final ScheduledExecutorService OPID_CLEANER = 
Executors.newScheduledThreadPool(1, r -> {
+    Thread thread = new Thread(r);
+    thread.setDaemon(true);
+    thread.setName("AsyncOperationsHandler-cleaner");
+    return thread;
+  });
+
+  private A result;
+  private boolean async;
+  private Future<A> future;
+  private ExecutorService executor;
+  private final AtomicBoolean aborted = new AtomicBoolean();
+
+  protected T request;
+  protected IHMSHandler handler;
+  protected final String id;
+  private long timeout;
+
+  private AbstractAsyncOperationHandler(String id) {
+    this.id = id;
+  }
+
+  AbstractAsyncOperationHandler(IHMSHandler handler, boolean async, T request) 
{
+    this.id = UUID.randomUUID().toString();
+    this.handler = handler;
+    this.request = request;
+    this.async = async;
+    this.timeout = MetastoreConf.getBoolVar(handler.getConf(), HIVE_IN_TEST) ? 
10 : 5000;
+    OPID_TO_HANDLER.put(id, this);
+    if (async) {
+      this.executor = Executors.newFixedThreadPool(1, r -> {
+        Thread thread = new Thread(r);
+        thread.setDaemon(true);
+        thread.setName("TableOperationsHandler " + id);
+        return thread;
+      });
+    } else {
+      this.executor = MoreExecutors.newDirectExecutorService();
+    }
+    this.future =
+        executor.submit(() -> {
+          try {
+            return execute();
+          } finally {
+            destroy();
+            OPID_CLEANER.schedule(() -> OPID_TO_HANDLER.remove(id), 1, 
TimeUnit.HOURS);
+          }
+        });
+    this.executor.shutdown();
+  }
+
+  private static <T extends TBase, A> AbstractAsyncOperationHandler<T, A>
+      ofCache(String opId, boolean shouldCancel) throws TException {
+    AbstractAsyncOperationHandler<T, A> asyncOp = null;
+    if (opId != null) {
+      asyncOp = OPID_TO_HANDLER.get(opId);
+      if (asyncOp == null && !shouldCancel) {
+        throw new MetaException("Couldn't find the async operation handler: " 
+ opId);
+      }
+      if (shouldCancel) {
+        if (asyncOp != null) {
+          asyncOp.cancelOperation();
+        } else {
+          asyncOp = new AbstractAsyncOperationHandler<>(opId) {
+            @Override
+            public OperationStatus getOperationStatus() throws TException {
+              OperationStatus resp = new OperationStatus(opId);
+              resp.setMessage("Operation has been canceled");
+              resp.setFinished(true);
+              return resp;
+            }
+            @Override
+            protected A execute() throws TException, IOException {
+              throw new UnsupportedOperationException();
+            }
+            @Override
+            public String getLogMessagePrefix() {
+              throw new UnsupportedOperationException();
+            }
+            @Override
+            public String getOperationProgress() {
+              throw new UnsupportedOperationException();
+            }
+          };
+        }
+      }
+    }
+    return asyncOp;
+  }
+
+  public static <T extends TBase, A> AbstractAsyncOperationHandler<T, A> 
offer(IHMSHandler handler, T req)
+      throws TException {
+    if (req instanceof DropTableRequest request) {
+      AbstractAsyncOperationHandler<T, A> asycOp = ofCache(request.getId(), 
request.isCancel());
+      if (asycOp == null) {
+        asycOp= (AbstractAsyncOperationHandler<T, A>)
+            new AsyncDropTableHandler(handler, request.isAsyncDrop(), request);
+      }
+      return asycOp;
+    }
+    throw new UnsupportedOperationException("Not yet implemented");
+  }
+
+  public OperationStatus getOperationStatus() throws TException {
+    String logMsgPrefix = getLogMessagePrefix();
+    if (future == null) {
+      throw new IllegalStateException(logMsgPrefix + " hasn't started yet");
+    }
+    try {
+      result = async ? future.get(timeout, TimeUnit.MILLISECONDS) : 
future.get();
+    } catch (TimeoutException e) {
+      // No Op, return to the caller since long polling timeout has expired
+      LOG.trace("{} Long polling timed out", logMsgPrefix);
+    } catch (CancellationException e) {
+      // The background operation thread was cancelled
+      LOG.trace("{} The background operation was cancelled", logMsgPrefix);
+    } catch (ExecutionException | InterruptedException e) {
+      // No op, we will deal with this exception later
+      LOG.error("{} Failed", logMsgPrefix, e);
+      if (e.getCause() instanceof Exception ex && !aborted.get()) {
+        throw 
handleException(ex).throwIfInstance(TException.class).defaultMetaException();
+      }
+      String errorMsg = e.getCause() != null ? e.getCause().getMessage() : 
e.getMessage();
+      throw new MetaException(logMsgPrefix + " failed with " + errorMsg);
+    }
+
+    OperationStatus resp = new OperationStatus(id);
+    if (future.isDone()) {
+      resp.setFinished(true);
+      resp.setMessage(logMsgPrefix + (future.isCancelled() ? " Canceled" : " 
Done"));
+    } else {
+      resp.setMessage(logMsgPrefix + " In-progress, state - " + 
getOperationProgress());
+    }
+    return resp;
+  }
+
+  static class OperationStatus {
+    private final String id;
+    private String message;
+    private boolean finished;
+    OperationStatus(String id) {
+      this.id = id;
+    }
+
+    public String getMessage() {
+      return message;
+    }
+
+    public void setMessage(String message) {
+      this.message = message;
+    }
+
+    public String getId() {
+      return id;
+    }
+
+    public boolean isFinished() {
+      return finished;
+    }
+
+    public void setFinished(boolean finished) {
+      this.finished = finished;
+    }
+  }
+
+  public void cancelOperation() {
+    if (!future.isDone()) {
+      LOG.warn("Operation: {} is still running, but a close signal is 
triggered", id);
+      future.cancel(true);
+      aborted.set(true);
+    }
+    executor.shutdown();
+  }
+
+  /**
+   * Retrieve the result after this operation is done,
+   * an IllegalStateException would raise if the operation has not completed.
+   * @return the operation result
+   * @throws TException exception while checking the status of the operation
+   */
+  public final A getResult() throws TException {
+    OperationStatus resp = getOperationStatus();
+    if (!resp.isFinished()) {
+      throw new IllegalStateException("Result is un-available as the operation 
" + id + " is still running");

Review Comment:
   ditto.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to