pvary commented on a change in pull request #3053:
URL: https://github.com/apache/hive/pull/3053#discussion_r816484985
##########
File path:
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts,
byte[] filterExp, CheckRes
if (tablePath == null) {
return;
}
- FileSystem fs = tablePath.getFileSystem(conf);
- if (!fs.exists(tablePath)) {
+ final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+ if (!fs[0].exists(tablePath)) {
result.getTablesNotOnFs().add(table.getTableName());
return;
}
Set<Path> partPaths = new HashSet<>();
- // check that the partition folders exist on disk
- for (Partition partition : parts) {
- if (partition == null) {
- // most likely the user specified an invalid partition
- continue;
- }
- Path partPath = getDataLocation(table, partition);
- if (partPath == null) {
- continue;
- }
- fs = partPath.getFileSystem(conf);
+ int threadCount = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
- CheckResult.PartitionResult prFromMetastore = new
CheckResult.PartitionResult();
- prFromMetastore.setPartitionName(getPartitionName(table, partition));
- prFromMetastore.setTableName(partition.getTableName());
- if (!fs.exists(partPath)) {
- result.getPartitionsNotOnFs().add(prFromMetastore);
- } else {
- result.getCorrectPartitions().add(prFromMetastore);
- }
+ Preconditions.checkArgument(!(threadCount <
1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+ Preconditions.checkArgument(!(threadCount >
30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
- if (partitionExpirySeconds > 0) {
- long currentEpochSecs = Instant.now().getEpochSecond();
- long createdTime = partition.getCreateTime();
- long partitionAgeSeconds = currentEpochSecs - createdTime;
- if (partitionAgeSeconds > partitionExpirySeconds) {
- CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
- pr.setPartitionName(getPartitionName(table, partition));
- pr.setTableName(partition.getTableName());
- result.getExpiredPartitions().add(pr);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s
expiry: {}s", partition.getCatName(),
- partition.getDbName(), partition.getTableName(),
pr.getPartitionName(), createdTime, currentEpochSecs,
- partitionAgeSeconds, partitionExpirySeconds);
- }
+ LOG.debug("Running with threads "+threadCount);
+
+ // For Multi Threaded run, we do not want to wait for All partitions in
queue to be processed,
+ // instead we run in batch to avoid OOM, we set Min and Max Pool Size =
METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+ // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+ final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+ threadCount,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(threadCount),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ try {
+ Queue<Future<String>> futures = new LinkedList<>();
+ // check that the partition folders exist on disk
+ for (Partition partition : parts) {
Review comment:
Or it is run in the original thead
##########
File path:
standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreChecker.java
##########
@@ -303,56 +308,103 @@ void checkTable(Table table, PartitionIterable parts,
byte[] filterExp, CheckRes
if (tablePath == null) {
return;
}
- FileSystem fs = tablePath.getFileSystem(conf);
- if (!fs.exists(tablePath)) {
+ final FileSystem[] fs = {tablePath.getFileSystem(conf)};
+ if (!fs[0].exists(tablePath)) {
result.getTablesNotOnFs().add(table.getTableName());
return;
}
Set<Path> partPaths = new HashSet<>();
- // check that the partition folders exist on disk
- for (Partition partition : parts) {
- if (partition == null) {
- // most likely the user specified an invalid partition
- continue;
- }
- Path partPath = getDataLocation(table, partition);
- if (partPath == null) {
- continue;
- }
- fs = partPath.getFileSystem(conf);
+ int threadCount = MetastoreConf.getIntVar(conf,
MetastoreConf.ConfVars.METASTORE_MSCK_FS_HANDLER_THREADS_COUNT);
- CheckResult.PartitionResult prFromMetastore = new
CheckResult.PartitionResult();
- prFromMetastore.setPartitionName(getPartitionName(table, partition));
- prFromMetastore.setTableName(partition.getTableName());
- if (!fs.exists(partPath)) {
- result.getPartitionsNotOnFs().add(prFromMetastore);
- } else {
- result.getCorrectPartitions().add(prFromMetastore);
- }
+ Preconditions.checkArgument(!(threadCount <
1),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be less than 1");
+ Preconditions.checkArgument(!(threadCount >
30),"METASTORE_MSCK_FS_HANDLER_THREADS_COUNT cannot be more than 30");
- if (partitionExpirySeconds > 0) {
- long currentEpochSecs = Instant.now().getEpochSecond();
- long createdTime = partition.getCreateTime();
- long partitionAgeSeconds = currentEpochSecs - createdTime;
- if (partitionAgeSeconds > partitionExpirySeconds) {
- CheckResult.PartitionResult pr = new CheckResult.PartitionResult();
- pr.setPartitionName(getPartitionName(table, partition));
- pr.setTableName(partition.getTableName());
- result.getExpiredPartitions().add(pr);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{}.{}.{}.{} expired. createdAt: {} current: {} age: {}s
expiry: {}s", partition.getCatName(),
- partition.getDbName(), partition.getTableName(),
pr.getPartitionName(), createdTime, currentEpochSecs,
- partitionAgeSeconds, partitionExpirySeconds);
- }
+ LOG.debug("Running with threads "+threadCount);
+
+ // For Multi Threaded run, we do not want to wait for All partitions in
queue to be processed,
+ // instead we run in batch to avoid OOM, we set Min and Max Pool Size =
METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+ // and Waiting Queue size = METASTORE_MSCK_FS_HANDLER_THREADS_COUNT
+
+ final ExecutorService pool = new ThreadPoolExecutor(threadCount,
+ threadCount,
+ 0L,
+ TimeUnit.MILLISECONDS,
+ new ArrayBlockingQueue<>(threadCount),
+ new ThreadPoolExecutor.CallerRunsPolicy());
+
+ try {
+ Queue<Future<String>> futures = new LinkedList<>();
+ // check that the partition folders exist on disk
+ for (Partition partition : parts) {
Review comment:
Check if the PartitionIterator is thread safe
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]