wecharyu commented on code in PR #5851:
URL: https://github.com/apache/hive/pull/5851#discussion_r2659011482
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java:
##########
@@ -5401,13 +4643,27 @@ public boolean drop_partition_req(final
DropPartitionRequest dropPartitionReq) t
boolean ret = false;
Exception ex = null;
try {
+ Table t = getMS().getTable(catName, dbName, tbl_name, null);
+ if (t == null) {
+ throw new InvalidObjectException(dbName + "." + tbl_name
+ + " table not found");
+ }
+ List<String> partNames = new ArrayList<>();
if (part_vals == null || part_vals.isEmpty()) {
- part_vals = getPartValsFromName(getMS(), catName, dbName, tbl_name,
dropPartitionReq.getPartName());
+ part_vals = getPartValsFromName(t, dropPartitionReq.getPartName());
}
+ partNames.add(Warehouse.makePartName(t.getPartitionKeys(), part_vals));
Review Comment:
```suggestion
partNames.add(dropPartitionReq.getPartName());
} else {
partNames.add(Warehouse.makePartName(t.getPartitionKeys(),
part_vals));
}
```
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java:
##########
@@ -335,6 +336,17 @@ void createTable(Table tbl) throws InvalidObjectException,
boolean dropTable(String catalogName, String dbName, String tableName)
throws MetaException, NoSuchObjectException, InvalidObjectException,
InvalidInputException;
+ /**
+ * Drop all partitions from the table, and return the partition's location
that not a child of baseLocationToNotShow,
+ * when the baseLocationToNotShow is not null.
+ * @param table the table to drop partitions from
+ * @param baseLocationToNotShow Partition locations which are child of this
path are omitted
Review Comment:
add `@param message`
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java:
##########
@@ -1855,7 +1591,7 @@ public void drop_database(final String dbName, final
boolean deleteData, final b
}
@Override
- public void drop_database_req(final DropDatabaseRequest req)
+ public AsyncOperationResp drop_database_req(final DropDatabaseRequest req)
throws NoSuchObjectException, InvalidOperationException, MetaException {
startFunction("drop_database", ": " + req.getName());
Review Comment:
If the operation is async, the recorded API time will be not the operation
finished cost time, should we record it in the handler?
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java:
##########
@@ -5125,257 +4091,57 @@ public boolean drop_partition(final String db_name,
final String tbl_name,
null);
}
- /** Stores a path and its size. */
- private static class PathAndDepth implements Comparable<PathAndDepth> {
- final Path path;
- final int depth;
-
- public PathAndDepth(Path path, int depth) {
- this.path = path;
- this.depth = depth;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(path.hashCode(), depth);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PathAndDepth that = (PathAndDepth) o;
- return depth == that.depth && Objects.equals(path, that.path);
- }
-
- /** The largest {@code depth} is processed first in a {@link
PriorityQueue}. */
- @Override
- public int compareTo(PathAndDepth o) {
- return o.depth - depth;
- }
- }
-
@Override
public DropPartitionsResult drop_partitions_req(
DropPartitionsRequest request) throws TException {
- RawStore ms = getMS();
- String dbName = request.getDbName(), tblName = request.getTblName();
- String catName = request.isSetCatName() ? request.getCatName() :
getDefaultCatalog(conf);
-
- boolean ifExists = request.isSetIfExists() && request.isIfExists();
- boolean deleteData = request.isSetDeleteData() && request.isDeleteData();
- boolean ignoreProtection = request.isSetIgnoreProtection() &&
request.isIgnoreProtection();
- boolean needResult = !request.isSetNeedResult() || request.isNeedResult();
-
- List<PathAndDepth> dirsToDelete = new ArrayList<>();
- List<Path> archToDelete = new ArrayList<>();
- EnvironmentContext envContext =
- request.isSetEnvironmentContext() ? request.getEnvironmentContext() :
null;
- boolean success = false;
-
- Table tbl = null;
- List<Partition> parts = null;
- boolean mustPurge = false;
- boolean tableDataShouldBeDeleted = false;
- long writeId = 0;
-
- Map<String, String> transactionalListenerResponses = null;
- boolean needsCm = false;
-
try {
- ms.openTransaction();
- // We need Partition-s for firing events and for result; DN needs
MPartition-s to drop.
- // Great... Maybe we could bypass fetching MPartitions by issuing direct
SQL deletes.
- GetTableRequest getTableRequest = new GetTableRequest(dbName, tblName);
- getTableRequest.setCatName(catName);
- tbl = get_table_core(getTableRequest);
- mustPurge = isMustPurge(envContext, tbl);
- tableDataShouldBeDeleted = checkTableDataShouldBeDeleted(tbl,
deleteData);
- writeId = getWriteId(envContext);
-
- boolean hasMissingParts = false;
- RequestPartsSpec spec = request.getParts();
- List<String> partNames = null;
-
- if (spec.isSetExprs()) {
- // Dropping by expressions.
- parts = new ArrayList<>(spec.getExprs().size());
- for (DropPartitionsExpr expr : spec.getExprs()) {
- List<Partition> result = new ArrayList<>();
- boolean hasUnknown = ms.getPartitionsByExpr(catName, dbName,
tblName, result,
- new GetPartitionsArgs.GetPartitionsArgsBuilder()
-
.expr(expr.getExpr()).skipColumnSchemaForPartition(request.isSkipColumnSchemaForPartition())
- .build());
- if (hasUnknown) {
- // Expr is built by DDLSA, it should only contain part cols and
simple ops
- throw new MetaException("Unexpected unknown partitions to drop");
- }
- // this is to prevent dropping archived partition which is archived
in a
- // different level the drop command specified.
- if (!ignoreProtection && expr.isSetPartArchiveLevel()) {
- for (Partition part : parts) {
- if (MetaStoreUtils.isArchived(part)
- && MetaStoreUtils.getArchivingLevel(part) <
expr.getPartArchiveLevel()) {
- throw new MetaException("Cannot drop a subset of partitions "
- + " in an archive, partition " + part);
- }
- }
- }
- if (result.isEmpty()) {
- hasMissingParts = true;
- if (!ifExists) {
- // fail-fast for missing partition expr
- break;
- }
- }
- parts.addAll(result);
- }
- } else if (spec.isSetNames()) {
- partNames = spec.getNames();
- parts = ms.getPartitionsByNames(catName, dbName, tblName,
- new GetPartitionsArgs.GetPartitionsArgsBuilder()
-
.partNames(partNames).skipColumnSchemaForPartition(request.isSkipColumnSchemaForPartition())
- .build());
- hasMissingParts = (parts.size() != partNames.size());
- } else {
- throw new MetaException("Partition spec is not set");
- }
-
- if (hasMissingParts && !ifExists) {
- throw new NoSuchObjectException("Some partitions to drop are missing");
- }
-
- List<String> colNames = null;
- if (partNames == null) {
- partNames = new ArrayList<>(parts.size());
- colNames = new ArrayList<>(tbl.getPartitionKeys().size());
- for (FieldSchema col : tbl.getPartitionKeys()) {
- colNames.add(col.getName());
- }
- }
-
- for (Partition part : parts) {
- // TODO - we need to speed this up for the normal path where all
partitions are under
- // the table and we don't have to stat every partition
-
- firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this));
- if (colNames != null) {
- partNames.add(FileUtils.makePartName(colNames, part.getValues()));
- }
- if (tableDataShouldBeDeleted) {
- if (MetaStoreUtils.isArchived(part)) {
- // Archived partition is only able to delete original location.
- Path archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
- verifyIsWritablePath(archiveParentDir);
- archToDelete.add(archiveParentDir);
- } else if ((part.getSd() != null) && (part.getSd().getLocation() !=
null)) {
- Path partPath = new Path(part.getSd().getLocation());
- verifyIsWritablePath(partPath);
- dirsToDelete.add(new PathAndDepth(partPath,
part.getValues().size()));
+ DropPartitionsResult resp = new DropPartitionsResult();
+ AbstractOperationHandler<DropPartitionsRequest,
DropPartitionsHandler.DropPartitionsResult> dropPartsOp =
+ AbstractOperationHandler.offer(this, request);
Review Comment:
Such calls can be changed as following, including other related api:
```suggestion
DropPartitionsHandler dropPartsOp = (DropPartitionsHandler)
AbstractOperationHandler.offer(this, request);
```
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/AbstractOperationHandler.java:
##########
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.MoreExecutors;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.api.AddPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.DropPartitionsRequest;
+import org.apache.hadoop.hive.metastore.api.DropTableRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.thrift.TBase;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.hadoop.hive.metastore.ExceptionHandler.handleException;
+import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+
+public abstract class AbstractOperationHandler<T extends TBase, A> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(AbstractOperationHandler.class);
+ private static final Map<String, AbstractOperationHandler> OPID_TO_HANDLER =
new ConcurrentHashMap<>();
+ private static final ScheduledExecutorService OPID_CLEANER =
Executors.newScheduledThreadPool(1, r -> {
+ Thread thread = new Thread(r);
+ thread.setDaemon(true);
+ thread.setName("OperationHandler-Cleaner");
+ return thread;
+ });
+
+ private A result;
+ private boolean async;
+ private Future<A> future;
+ private ExecutorService executor;
Review Comment:
Is it better to use a shared thread pool for the operation handler? In the
current implementation, the number of threads is not bounded, which could lead
to resource exhaustion or even crashes.
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java:
##########
@@ -1865,18 +1601,65 @@ public void drop_database_req(final DropDatabaseRequest
req)
throw new MetaException("Can not drop " + DEFAULT_DATABASE_NAME + "
database in catalog "
+ DEFAULT_CATALOG_NAME);
}
- boolean success = false;
+
Exception ex = null;
try {
- drop_database_core(getMS(), req);
- success = true;
+ AbstractOperationHandler<DropDatabaseRequest,
DropDatabaseHandler.DropDatabaseResult> dropDatabaseOp =
+ AbstractOperationHandler.offer(this, req);
+ AbstractOperationHandler.OperationStatus status =
dropDatabaseOp.getOperationStatus();
+ if (status.isFinished() && dropDatabaseOp.getResult() != null &&
dropDatabaseOp.getResult().isSuccess()) {
Review Comment:
`dropDatabaseOp.getResult() != null` is redundant and can be removed.
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/RawStore.java:
##########
@@ -335,6 +336,17 @@ void createTable(Table tbl) throws InvalidObjectException,
boolean dropTable(String catalogName, String dbName, String tableName)
throws MetaException, NoSuchObjectException, InvalidObjectException,
InvalidInputException;
+ /**
+ * Drop all partitions from the table, and return the partition's location
that not a child of baseLocationToNotShow,
+ * when the baseLocationToNotShow is not null.
+ * @param table the table to drop partitions from
+ * @param baseLocationToNotShow Partition locations which are child of this
path are omitted
+ * @return list of partition locations outside baseLocationToNotShow
+ * @throws MetaException something went wrong, usually in the RDBMS or
storage
+ * @throws InvalidInputException unable to drop all partitions due to the
invalid input
+ */
+ List<String> dropAllPartitionsAndGetLocations(TableName table, String
baseLocationToNotShow, AtomicReference<String> message)
+ throws MetaException, InvalidInputException, NoSuchObjectException,
InvalidObjectException;
Review Comment:
nit: add an empty line
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HMSHandler.java:
##########
@@ -5125,257 +4567,57 @@ public boolean drop_partition(final String db_name,
final String tbl_name,
null);
}
- /** Stores a path and its size. */
- private static class PathAndDepth implements Comparable<PathAndDepth> {
- final Path path;
- final int depth;
-
- public PathAndDepth(Path path, int depth) {
- this.path = path;
- this.depth = depth;
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(path.hashCode(), depth);
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
- PathAndDepth that = (PathAndDepth) o;
- return depth == that.depth && Objects.equals(path, that.path);
- }
-
- /** The largest {@code depth} is processed first in a {@link
PriorityQueue}. */
- @Override
- public int compareTo(PathAndDepth o) {
- return o.depth - depth;
- }
- }
-
@Override
public DropPartitionsResult drop_partitions_req(
DropPartitionsRequest request) throws TException {
- RawStore ms = getMS();
- String dbName = request.getDbName(), tblName = request.getTblName();
- String catName = request.isSetCatName() ? request.getCatName() :
getDefaultCatalog(conf);
-
- boolean ifExists = request.isSetIfExists() && request.isIfExists();
- boolean deleteData = request.isSetDeleteData() && request.isDeleteData();
- boolean ignoreProtection = request.isSetIgnoreProtection() &&
request.isIgnoreProtection();
- boolean needResult = !request.isSetNeedResult() || request.isNeedResult();
-
- List<PathAndDepth> dirsToDelete = new ArrayList<>();
- List<Path> archToDelete = new ArrayList<>();
- EnvironmentContext envContext =
- request.isSetEnvironmentContext() ? request.getEnvironmentContext() :
null;
- boolean success = false;
-
- Table tbl = null;
- List<Partition> parts = null;
- boolean mustPurge = false;
- boolean tableDataShouldBeDeleted = false;
- long writeId = 0;
-
- Map<String, String> transactionalListenerResponses = null;
- boolean needsCm = false;
-
try {
- ms.openTransaction();
- // We need Partition-s for firing events and for result; DN needs
MPartition-s to drop.
- // Great... Maybe we could bypass fetching MPartitions by issuing direct
SQL deletes.
- GetTableRequest getTableRequest = new GetTableRequest(dbName, tblName);
- getTableRequest.setCatName(catName);
- tbl = get_table_core(getTableRequest);
- mustPurge = isMustPurge(envContext, tbl);
- tableDataShouldBeDeleted = checkTableDataShouldBeDeleted(tbl,
deleteData);
- writeId = getWriteId(envContext);
-
- boolean hasMissingParts = false;
- RequestPartsSpec spec = request.getParts();
- List<String> partNames = null;
-
- if (spec.isSetExprs()) {
- // Dropping by expressions.
- parts = new ArrayList<>(spec.getExprs().size());
- for (DropPartitionsExpr expr : spec.getExprs()) {
- List<Partition> result = new ArrayList<>();
- boolean hasUnknown = ms.getPartitionsByExpr(catName, dbName,
tblName, result,
- new GetPartitionsArgs.GetPartitionsArgsBuilder()
-
.expr(expr.getExpr()).skipColumnSchemaForPartition(request.isSkipColumnSchemaForPartition())
- .build());
- if (hasUnknown) {
- // Expr is built by DDLSA, it should only contain part cols and
simple ops
- throw new MetaException("Unexpected unknown partitions to drop");
- }
- // this is to prevent dropping archived partition which is archived
in a
- // different level the drop command specified.
- if (!ignoreProtection && expr.isSetPartArchiveLevel()) {
- for (Partition part : parts) {
- if (MetaStoreUtils.isArchived(part)
- && MetaStoreUtils.getArchivingLevel(part) <
expr.getPartArchiveLevel()) {
- throw new MetaException("Cannot drop a subset of partitions "
- + " in an archive, partition " + part);
- }
- }
- }
- if (result.isEmpty()) {
- hasMissingParts = true;
- if (!ifExists) {
- // fail-fast for missing partition expr
- break;
- }
+ DropPartitionsResult resp = new DropPartitionsResult();
+ AbstractOperationHandler<DropPartitionsRequest,
DropPartitionsHandler.DropPartitionsResult> dropPartsOp =
+ AbstractOperationHandler.offer(this, request);
+ AbstractOperationHandler.OperationStatus status =
dropPartsOp.getOperationStatus();
+ if (status.isFinished() && dropPartsOp.getResult() != null &&
dropPartsOp.getResult().success()) {
+ DropPartitionsHandler.DropPartitionsResult result =
dropPartsOp.getResult();
+ long writeId = getWriteId(request.getEnvironmentContext());
+ if (result.tableDataShouldBeDeleted()) {
+ LOG.info(result.mustPurge() ?
+ "dropPartition() will purge partition-directories directly,
skipping trash."
+ : "dropPartition() will move partition-directories to
trash-directory.");
+ // Archived partitions have har:/to_har_file as their location.
+ // The original directory was saved in params
+ for (Path path : result.getArchToDelete()) {
+ wh.deleteDir(path, result.mustPurge(), result.needCm());
}
- parts.addAll(result);
- }
- } else if (spec.isSetNames()) {
- partNames = spec.getNames();
- parts = ms.getPartitionsByNames(catName, dbName, tblName,
- new GetPartitionsArgs.GetPartitionsArgsBuilder()
-
.partNames(partNames).skipColumnSchemaForPartition(request.isSkipColumnSchemaForPartition())
- .build());
- hasMissingParts = (parts.size() != partNames.size());
- } else {
- throw new MetaException("Partition spec is not set");
- }
-
- if (hasMissingParts && !ifExists) {
- throw new NoSuchObjectException("Some partitions to drop are missing");
- }
-
- List<String> colNames = null;
- if (partNames == null) {
- partNames = new ArrayList<>(parts.size());
- colNames = new ArrayList<>(tbl.getPartitionKeys().size());
- for (FieldSchema col : tbl.getPartitionKeys()) {
- colNames.add(col.getName());
- }
- }
-
- for (Partition part : parts) {
- // TODO - we need to speed this up for the normal path where all
partitions are under
- // the table and we don't have to stat every partition
-
- firePreEvent(new PreDropPartitionEvent(tbl, part, deleteData, this));
- if (colNames != null) {
- partNames.add(FileUtils.makePartName(colNames, part.getValues()));
- }
- if (tableDataShouldBeDeleted) {
- if (MetaStoreUtils.isArchived(part)) {
- // Archived partition is only able to delete original location.
- Path archiveParentDir = MetaStoreUtils.getOriginalLocation(part);
- verifyIsWritablePath(archiveParentDir);
- archToDelete.add(archiveParentDir);
- } else if ((part.getSd() != null) && (part.getSd().getLocation() !=
null)) {
- Path partPath = new Path(part.getSd().getLocation());
- verifyIsWritablePath(partPath);
- dirsToDelete.add(new PathAndDepth(partPath,
part.getValues().size()));
- }
- }
- }
- ms.dropPartitions(catName, dbName, tblName, partNames);
- if (!parts.isEmpty() && !transactionalListeners.isEmpty()) {
- transactionalListenerResponses = MetaStoreListenerNotifier
- .notifyEvent(transactionalListeners, EventType.DROP_PARTITION,
- new DropPartitionEvent(tbl, parts, true, deleteData, this),
envContext);
- }
- success = ms.commitTransaction();
- needsCm = ReplChangeManager.shouldEnableCm(ms.getDatabase(catName,
dbName), tbl);
-
- DropPartitionsResult result = new DropPartitionsResult();
- if (needResult) {
- result.setPartitions(parts);
- }
- return result;
- } finally {
- if (!success) {
- ms.rollbackTransaction();
- } else if (tableDataShouldBeDeleted) {
- LOG.info(mustPurge ?
- "dropPartition() will purge partition-directories directly,
skipping trash."
- : "dropPartition() will move partition-directories to
trash-directory.");
- // Archived partitions have har:/to_har_file as their location.
- // The original directory was saved in params
- for (Path path : archToDelete) {
- wh.deleteDir(path, mustPurge, needsCm);
- }
-
- // Uses a priority queue to delete the parents of deleted directories
if empty.
- // Parents with the deepest path are always processed first. It
guarantees that the emptiness
- // of a parent won't be changed once it has been processed. So
duplicated processing can be
- // avoided.
- PriorityQueue<PathAndDepth> parentsToDelete = new PriorityQueue<>();
- for (PathAndDepth p : dirsToDelete) {
- wh.deleteDir(p.path, mustPurge, needsCm);
- addParentForDel(parentsToDelete, p);
- }
-
- HashSet<PathAndDepth> processed = new HashSet<>();
- while (!parentsToDelete.isEmpty()) {
- try {
- PathAndDepth p = parentsToDelete.poll();
- if (processed.contains(p)) {
- continue;
+ // Uses a priority queue to delete the parents of deleted
directories if empty.
+ // Parents with the deepest path are always processed first. It
guarantees that the emptiness
+ // of a parent won't be changed once it has been processed. So
duplicated processing can be
+ // avoided.
+ for (Iterator<DropPartitionsHandler.PathAndDepth> iterator =
result.getDirsToDelete();
+ iterator.hasNext();) {
+ DropPartitionsHandler.PathAndDepth p = iterator.next();
+ Path path = p.path();
+ if (p.isPartitionDir()) {
+ wh.deleteDir(path, result.mustPurge(), result.needCm());
+ } else if (wh.isWritable(path) && wh.isEmptyDir(path)) {
+ wh.deleteDir(path, result.mustPurge(), result.needCm());
}
- processed.add(p);
-
- Path path = p.path;
- if (wh.isWritable(path) && wh.isEmptyDir(path)) {
- wh.deleteDir(path, mustPurge, needsCm);
- addParentForDel(parentsToDelete, p);
- }
- } catch (IOException ex) {
- LOG.warn("Error from recursive parent deletion", ex);
- throw new MetaException("Failed to delete parent: " +
ex.getMessage());
}
- }
- } else if (TxnUtils.isTransactionalTable(tbl) && writeId > 0) {
- for (Partition part : parts) {
- if ((part.getSd() != null) && (part.getSd().getLocation() != null)) {
- Path partPath = new Path(part.getSd().getLocation());
- verifyIsWritablePath(partPath);
-
- addTruncateBaseFile(partPath, writeId, conf, DataFormat.DROPPED);
+ } else if (result.isTransactionalTable() && writeId > 0) {
+ for (Partition part : result.getPartitions()) {
+ if ((part.getSd() != null) && (part.getSd().getLocation() !=
null)) {
+ Path partPath = new Path(part.getSd().getLocation());
+ ((DropPartitionsHandler)
dropPartsOp).verifyIsWritablePath(partPath);
+ addTruncateBaseFile(partPath, writeId, conf, DataFormat.DROPPED);
+ }
}
}
- }
-
- if (parts != null) {
- if (!parts.isEmpty() && !listeners.isEmpty()) {
- MetaStoreListenerNotifier.notifyEvent(listeners,
- EventType.DROP_PARTITION,
- new DropPartitionEvent(tbl, parts, success, deleteData, this),
- envContext,
- transactionalListenerResponses, ms);
+ if (request.isNeedResult()) {
+ resp.setPartitions(result.getPartitions());
}
}
- }
- }
-
- private static void addParentForDel(PriorityQueue<PathAndDepth>
parentsToDelete, PathAndDepth p) {
- Path parent = p.path.getParent();
- if (parent != null && p.depth - 1 > 0) {
- parentsToDelete.add(new PathAndDepth(parent, p.depth - 1));
- }
- }
-
- private void verifyIsWritablePath(Path dir) throws MetaException {
- try {
- if (!wh.isWritable(dir.getParent())) {
- throw new MetaException("Table partition not deleted since " +
dir.getParent()
- + " is not writable by " + SecurityUtils.getUser());
- }
- } catch (IOException ex) {
- LOG.warn("Error from isWritable", ex);
- throw new MetaException("Table partition not deleted since " +
dir.getParent()
- + " access cannot be checked: " + ex.getMessage());
+ return resp;
Review Comment:
If the returned `partitions` of `resp` is empty, the client can not know if
it's finished or still running.
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropDatabaseHandler.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.Batchable;
+import org.apache.hadoop.hive.metastore.HMSHandler;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.DropPackageRequest;
+import org.apache.hadoop.hive.metastore.api.DropTableRequest;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.ListPackageRequest;
+import org.apache.hadoop.hive.metastore.api.ListStoredProcedureRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.PreDropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.thrift.TException;
+
+import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+import static
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.checkTableDataShouldBeDeleted;
+import static
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isDbReplicationTarget;
+import static
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
+
+public class DropDatabaseHandler
+ extends AbstractOperationHandler<DropDatabaseRequest,
DropDatabaseHandler.DropDatabaseResult> {
+ private String name;
+ private String catalogName;
+ private Database db;
+ private List<Table> tables;
+ private List<Function> functions;
+ private List<String> procedures;
+ private List<String> packages;
+ private AtomicReference<String> progress;
+ private DropDatabaseResult result;
+
+ DropDatabaseHandler(IHMSHandler handler, DropDatabaseRequest request)
+ throws TException, IOException {
+ super(handler, request.isAsyncDrop(), request);
+ }
+
+ public DropDatabaseResult execute() throws TException, IOException {
+ boolean success = false;
+ Map<String, String> transactionalListenerResponses =
Collections.emptyMap();
+ RawStore rs = handler.getMS();
+ rs.openTransaction();
+ try {
+ if (MetaStoreUtils.isDatabaseRemote(db)) {
+ if (rs.dropDatabase(db.getCatalogName(), db.getName())) {
+ success = rs.commitTransaction();
+ }
+ return result;
+ }
+ List<Path> partitionPaths = new ArrayList<>();
+ // drop any functions before dropping db
+ for (int i = 0, j = functions.size(); i < functions.size(); i++, j--) {
+ progress.set("Dropping functions from the database, " + j + "
functions left");
+ Function func = functions.get(i);
+ rs.dropFunction(catalogName, name, func.getFunctionName());
+ }
+
+ for (int i = 0, j = procedures.size(); i < procedures.size(); i++, j--) {
+ progress.set("Dropping procedures from the database, " + j + "
procedures left");
+ String procName = procedures.get(i);
+ rs.dropStoredProcedure(catalogName, name, procName);
+ }
+
+ for (int i = 0, j = packages.size(); i < packages.size(); i++, j--) {
+ progress.set("Dropping packages from the database, " + j + " packages
left");
+ String pkgName = packages.get(i);
+ rs.dropPackage(new DropPackageRequest(catalogName, name, pkgName));
+ }
+
+ List<Table> tablesToDrop = sortTablesToDrop();
+ for (int i = 0, j = tablesToDrop.size(); i < tablesToDrop.size(); i++,
j--) {
+ progress.set("Dropping tables from the database, " + j + " tables
left");
+ checkInterrupted();
+ Table table = tablesToDrop.get(i);
+ boolean isSoftDelete = TxnUtils.isTableSoftDeleteEnabled(table,
request.isSoftDelete());
+ boolean tableDataShouldBeDeleted =
checkTableDataShouldBeDeleted(table, request.isDeleteData())
+ && !isSoftDelete;
+
+ EnvironmentContext context = null;
+ if (isSoftDelete) {
+ context = new EnvironmentContext();
+ context.putToProperties(hive_metastoreConstants.TXN_ID,
String.valueOf(request.getTxnId()));
+ request.setDeleteManagedDir(false);
+ }
+ DropTableRequest dropRequest = new DropTableRequest(name,
table.getTableName());
+ dropRequest.setCatalogName(catalogName);
+ dropRequest.setEnvContext(context);
+ // Drop the table but not its data
+ dropRequest.setDeleteData(false);
+ dropRequest.setDropPartitions(true);
Review Comment:
Better to explicitly disable asyncDrop here:
```java
dropRequest.setAsyncDrop(false);
```
##########
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/handler/DropDatabaseHandler.java:
##########
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.metastore.handler;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.Batchable;
+import org.apache.hadoop.hive.metastore.HMSHandler;
+import org.apache.hadoop.hive.metastore.IHMSHandler;
+import org.apache.hadoop.hive.metastore.MetaStoreListenerNotifier;
+import org.apache.hadoop.hive.metastore.RawStore;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.Warehouse;
+import org.apache.hadoop.hive.metastore.api.Database;
+import org.apache.hadoop.hive.metastore.api.DropDatabaseRequest;
+import org.apache.hadoop.hive.metastore.api.DropPackageRequest;
+import org.apache.hadoop.hive.metastore.api.DropTableRequest;
+import org.apache.hadoop.hive.metastore.api.EnvironmentContext;
+import org.apache.hadoop.hive.metastore.api.Function;
+import org.apache.hadoop.hive.metastore.api.InvalidOperationException;
+import org.apache.hadoop.hive.metastore.api.ListPackageRequest;
+import org.apache.hadoop.hive.metastore.api.ListStoredProcedureRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.ResourceUri;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.events.DropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.events.PreDropDatabaseEvent;
+import org.apache.hadoop.hive.metastore.messaging.EventMessage;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.FileUtils;
+import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.utils.SecurityUtils;
+import org.apache.thrift.TException;
+
+import static
org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars.HIVE_IN_TEST;
+import static
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.checkTableDataShouldBeDeleted;
+import static
org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils.isDbReplicationTarget;
+
+public class DropDatabaseHandler
+ extends AbstractOperationHandler<DropDatabaseRequest,
DropDatabaseHandler.DropDatabaseResult> {
+ private String name;
+ private Database db;
+ private List<Table> tables;
+ private List<Function> functions;
+ private List<String> procedures;
+ private List<String> packages;
+ private AtomicReference<String> progress;
+ private DropDatabaseResult result;
+
+ DropDatabaseHandler(IHMSHandler handler, DropDatabaseRequest request)
+ throws TException, IOException {
+ super(handler, request.isAsyncDrop(), request);
+ }
+
+ public DropDatabaseResult execute() throws TException, IOException {
+ boolean success = false;
+ Map<String, String> transactionalListenerResponses =
Collections.emptyMap();
+ RawStore rs = handler.getMS();
+ rs.openTransaction();
+ try {
+ if (MetaStoreUtils.isDatabaseRemote(db)) {
+ if (rs.dropDatabase(db.getCatalogName(), db.getName())) {
+ success = rs.commitTransaction();
+ }
+ return result;
+ }
+ List<Path> partitionPaths = new ArrayList<>();
+ // drop any functions before dropping db
+ for (int i = 0, j = functions.size(); i < functions.size(); i++, j--) {
+ progress.set("Dropping functions from the database, " + j + "
functions left");
+ Function func = functions.get(i);
+ rs.dropFunction(request.getCatalogName(), request.getName(),
func.getFunctionName());
+ }
+
+ for (int i = 0, j = procedures.size(); i < procedures.size(); i++, j--) {
+ progress.set("Dropping procedures from the database, " + j + "
procedures left");
+ String procName = procedures.get(i);
+ rs.dropStoredProcedure(request.getCatalogName(), request.getName(),
procName);
+ }
+
+ for (int i = 0, j = packages.size(); i < packages.size(); i++, j--) {
+ progress.set("Dropping packages from the database, " + j + " packages
left");
+ String pkgName = packages.get(i);
+ rs.dropPackage(new DropPackageRequest(request.getCatalogName(),
request.getName(), pkgName));
+ }
+
+ List<Table> tablesToDrop = sortTablesToDrop();
+ for (int i = 0, j = tablesToDrop.size(); i < tablesToDrop.size(); i++,
j--) {
+ progress.set("Dropping tables from the database, " + j + " tables
left");
+ checkInterrupted();
+ Table table = tablesToDrop.get(i);
+ boolean isSoftDelete = TxnUtils.isTableSoftDeleteEnabled(table,
request.isSoftDelete());
+ boolean tableDataShouldBeDeleted =
checkTableDataShouldBeDeleted(table, request.isDeleteData())
+ && !isSoftDelete;
+
+ EnvironmentContext context = null;
+ if (isSoftDelete) {
+ context = new EnvironmentContext();
+ context.putToProperties(hive_metastoreConstants.TXN_ID,
String.valueOf(request.getTxnId()));
+ request.setDeleteManagedDir(false);
+ }
+ DropTableRequest dropRequest = new DropTableRequest(request.getName(),
table.getTableName());
+ dropRequest.setCatalogName(request.getCatalogName());
+ dropRequest.setEnvContext(context);
+ // Drop the table but not its data
+ dropRequest.setDeleteData(false);
+ dropRequest.setDropPartitions(true);
+ AbstractOperationHandler<DropTableRequest,
DropTableHandler.DropTableResult> dropTable =
+ AbstractOperationHandler.offer(handler, dropRequest);
+ DropTableHandler.DropTableResult dropTableResult =
dropTable.getResult();
+ if (tableDataShouldBeDeleted
+ && dropTableResult.success()
+ && dropTableResult.partPaths() != null) {
+ partitionPaths.addAll(dropTableResult.partPaths());
+ }
+ }
+
+ if (rs.dropDatabase(request.getCatalogName(), request.getName())) {
+ if (!handler.getTransactionalListeners().isEmpty()) {
+ checkInterrupted();
+ DropDatabaseEvent dropEvent = new DropDatabaseEvent(db, true,
handler, isDbReplicationTarget(db));
+ EnvironmentContext context = null;
+ if (!request.isDeleteManagedDir()) {
+ context = new EnvironmentContext();
+ context.putToProperties(hive_metastoreConstants.TXN_ID,
String.valueOf(request.getTxnId()));
+ }
+ dropEvent.setEnvironmentContext(context);
+ transactionalListenerResponses =
+
MetaStoreListenerNotifier.notifyEvent(handler.getTransactionalListeners(),
+ EventMessage.EventType.DROP_DATABASE, dropEvent);
+ }
+ success = rs.commitTransaction();
+ }
+ result.setSuccess(success);
+ result.setPartitionPaths(partitionPaths);
+ } finally {
+ if (!success) {
+ rs.rollbackTransaction();
+ }
+ if (!handler.getListeners().isEmpty()) {
+ MetaStoreListenerNotifier.notifyEvent(handler.getListeners(),
+ EventMessage.EventType.DROP_DATABASE,
+ new DropDatabaseEvent(db, success, handler,
isDbReplicationTarget(db)),
+ null,
+ transactionalListenerResponses, rs);
+ }
+ }
+ return result;
+ }
+
+ @Override
+ protected void beforeExecute() throws TException, IOException {
+ if ((name = request.getName()) == null) {
+ throw new MetaException("Database name cannot be null.");
+ }
+ RawStore rs = handler.getMS();
+ String catalogName =
+ request.isSetCatalogName() ? request.getCatalogName() :
MetaStoreUtils.getDefaultCatalog(handler.getConf());
+ request.setCatalogName(catalogName);
+ db = rs.getDatabase(request.getCatalogName(), request.getName());
+ if (!MetastoreConf.getBoolVar(handler.getConf(), HIVE_IN_TEST) &&
ReplChangeManager.isSourceOfReplication(db)) {
+ throw new InvalidOperationException("can not drop a database which is a
source of replication");
+ }
+
+ List<String> tableNames =
defaultEmptyList(rs.getAllTables(request.getCatalogName(), request.getName()));
+ functions =
defaultEmptyList(rs.getFunctionsRequest(request.getCatalogName(),
request.getName(), null, false));
+ ListStoredProcedureRequest procedureRequest = new
ListStoredProcedureRequest(request.getCatalogName());
+ procedureRequest.setDbName(request.getName());
+ procedures = defaultEmptyList(rs.getAllStoredProcedures(procedureRequest));
+ ListPackageRequest pkgRequest = new
ListPackageRequest(request.getCatalogName());
+ pkgRequest.setDbName(request.getName());
+ packages = defaultEmptyList(rs.listPackages(pkgRequest));
+
+ if (!request.isCascade()) {
+ if (!tableNames.isEmpty()) {
+ throw new InvalidOperationException(
+ "Database " + db.getName() + " is not empty. One or more tables
exist.");
+ }
+ if (!functions.isEmpty()) {
+ throw new InvalidOperationException(
+ "Database " + db.getName() + " is not empty. One or more functions
exist.");
+ }
+ if (!procedures.isEmpty()) {
+ throw new InvalidOperationException(
+ "Database " + db.getName() + " is not empty. One or more stored
procedures exist.");
+ }
+ if (!packages.isEmpty()) {
+ throw new InvalidOperationException(
+ "Database " + db.getName() + " is not empty. One or more packages
exist.");
+ }
+ }
+ Path path = new Path(db.getLocationUri()).getParent();
+ if (!handler.getWh().isWritable(path)) {
+ throw new MetaException("Database not dropped since its external
warehouse location " + path +
+ " is not writable by " + SecurityUtils.getUser());
+ }
+ path = handler.getWh().getDatabaseManagedPath(db).getParent();
+ if (!handler.getWh().isWritable(path)) {
+ throw new MetaException("Database not dropped since its managed
warehouse location " + path +
+ " is not writable by " + SecurityUtils.getUser());
+ }
+
+ result = new DropDatabaseResult(db);
+ checkFuncPathToCm();
+ // check the permission of table path to be deleted
+ checkTablePathPermission(rs, tableNames);
+ progress = new AtomicReference<>(
+ String.format("Starting to drop the database with %d tables, %d
functions, %d procedures and %d packages.",
+ tables.size(), functions.size(), procedures.size(),
packages.size()));
+
+ ((HMSHandler) handler).firePreEvent(new PreDropDatabaseEvent(db, handler));
+ }
+
+ private void checkFuncPathToCm() {
+ boolean needsCm = ReplChangeManager.isSourceOfReplication(db);
+ List<Path> funcNeedCmPaths = new ArrayList<>();
+ for (Function func : functions) {
+ // if copy of jar to change management fails we fail the metastore
transaction, since the
+ // user might delete the jars on HDFS externally after dropping the
function, hence having
+ // a copy is required to allow incremental replication to work correctly.
+ if (func.getResourceUris() != null && !func.getResourceUris().isEmpty())
{
+ for (ResourceUri uri : func.getResourceUris()) {
+ if (uri.getUri().toLowerCase().startsWith("hdfs:") && needsCm) {
Review Comment:
This function can fast-return if `needsCm` is false.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]