This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 0f060b9cb0 Convert Bulk Import metadata actions to use Ample (#3255)
0f060b9cb0 is described below

commit 0f060b9cb0e0811398a88b17ff62fb070ef6a463
Author: Daniel Roberts <ddani...@gmail.com>
AuthorDate: Thu Mar 30 18:27:07 2023 -0400

    Convert Bulk Import metadata actions to use Ample (#3255)
    
    Refactors the addBulkImportInProgressFlag, removeBulkImportInProgressFlag,
    and removeBulkLoadEntries methods and moves them to ServerAmpleImpl
    from the MetadataTableUtil class.
    
    Removes the private createWriter method in favor of just calling
    context directly with the datalevel.
---
 .../accumulo/core/metadata/schema/Ample.java       | 30 ++++++++
 .../accumulo/server/metadata/ServerAmpleImpl.java  | 89 +++++++++++++++++-----
 .../accumulo/server/util/MetadataTableUtil.java    | 48 ------------
 .../manager/tableOps/bulkVer1/BulkImport.java      |  3 +-
 .../tableOps/bulkVer1/CleanUpBulkImport.java       | 11 ++-
 .../manager/tableOps/bulkVer2/BulkImportMove.java  |  4 +-
 .../tableOps/bulkVer2/CleanUpBulkImport.java       | 11 ++-
 7 files changed, 110 insertions(+), 86 deletions(-)

diff --git 
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java 
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 7e1c5163a4..7db46c77a0 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -339,4 +339,34 @@ public interface Ample {
   default void deleteScanServerFileReferences(String serverAddress, UUID 
serverSessionId) {
     throw new UnsupportedOperationException();
   }
+
+  /**
+   * Create a Bulk Load In Progress flag in the metadata table
+   *
+   * @param path The bulk directory filepath
+   * @param fateTxid The id of the Bulk Import Fate operation.
+   */
+  default void addBulkLoadInProgressFlag(String path, long fateTxid) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Remove a Bulk Load In Progress flag from the metadata table.
+   *
+   * @param path The bulk directory filepath
+   */
+  default void removeBulkLoadInProgressFlag(String path) {
+    throw new UnsupportedOperationException();
+  }
+
+  /**
+   * Remove all the Bulk Load transaction ids from a given table's metadata
+   *
+   * @param tableId Table ID for transaction removals
+   * @param tid Transaction ID to remove
+   */
+  default void removeBulkLoadEntries(TableId tableId, long tid) {
+    throw new UnsupportedOperationException();
+  }
+
 }
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 3f23fca6ab..d23917f0a3 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.accumulo.server.util.MetadataTableUtil.EMPTY_TEXT;
 
 import java.util.Collection;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -33,13 +34,17 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Key;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.gc.ReferenceFile;
 import org.apache.accumulo.core.metadata.MetadataTable;
 import org.apache.accumulo.core.metadata.RootTable;
@@ -50,10 +55,12 @@ import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.AmpleImpl;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
+import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection.SkewedKeyValue;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ExternalCompactionSection;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
+import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
 import org.apache.accumulo.core.security.Authorizations;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.hadoop.io.Text;
@@ -118,11 +125,11 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
       return;
     }
 
-    try (BatchWriter writer = createWriter(tableId)) {
+    try (BatchWriter writer = 
context.createBatchWriter(DataLevel.of(tableId).metaTable())) {
       for (StoredTabletFile file : candidates) {
         writer.addMutation(createDeleteMutation(file));
       }
-    } catch (MutationsRejectedException e) {
+    } catch (MutationsRejectedException | TableNotFoundException e) {
       throw new RuntimeException(e);
     }
   }
@@ -130,19 +137,74 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
   @Override
   public void putGcFileAndDirCandidates(TableId tableId, 
Collection<ReferenceFile> candidates) {
 
-    if (RootTable.ID.equals(tableId)) {
-
+    if (DataLevel.of(tableId) == DataLevel.ROOT) {
       // Directories are unexpected for the root tablet, so convert to stored 
tablet file
       mutateRootGcCandidates(rgcc -> rgcc.add(candidates.stream()
           .map(reference -> new 
StoredTabletFile(reference.getMetadataEntry()))));
       return;
     }
 
-    try (BatchWriter writer = createWriter(tableId)) {
+    try (BatchWriter writer = 
context.createBatchWriter(DataLevel.of(tableId).metaTable())) {
       for (var fileOrDir : candidates) {
         writer.addMutation(createDeleteMutation(fileOrDir));
       }
-    } catch (MutationsRejectedException e) {
+    } catch (MutationsRejectedException | TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void addBulkLoadInProgressFlag(String path, long fateTxid) {
+
+    // Bulk Import operations are not supported on the metadata table, so no 
entries will ever be
+    // required on the root table.
+    Mutation m = new Mutation(BlipSection.getRowPrefix() + path);
+    m.put(EMPTY_TEXT, EMPTY_TEXT, new Value(FateTxId.formatTid(fateTxid)));
+
+    try (BatchWriter bw = context.createBatchWriter(MetadataTable.NAME)) {
+      bw.addMutation(m);
+    } catch (MutationsRejectedException | TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void removeBulkLoadInProgressFlag(String path) {
+
+    // Bulk Import operations are not supported on the metadata table, so no 
entries will ever be
+    // required on the root table.
+    Mutation m = new Mutation(BlipSection.getRowPrefix() + path);
+    m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
+
+    try (BatchWriter bw = context.createBatchWriter(MetadataTable.NAME)) {
+      bw.addMutation(m);
+    } catch (MutationsRejectedException | TableNotFoundException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void removeBulkLoadEntries(TableId tableId, long tid) {
+    Preconditions.checkArgument(DataLevel.of(tableId) == DataLevel.USER);
+    try (
+        Scanner mscanner =
+            new IsolatedScanner(context.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY));
+        BatchWriter bw = context.createBatchWriter(MetadataTable.NAME)) {
+      mscanner.setRange(new KeyExtent(tableId, null, null).toMetaRange());
+      mscanner.fetchColumnFamily(BulkFileColumnFamily.NAME);
+
+      for (Map.Entry<Key,Value> entry : mscanner) {
+        log.trace("Looking at entry {} with tid {}", entry, tid);
+        long entryTid = BulkFileColumnFamily.getBulkLoadTid(entry.getValue());
+        if (tid == entryTid) {
+          log.trace("deleting entry {}", entry);
+          Key key = entry.getKey();
+          Mutation m = new Mutation(key.getRow());
+          m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
+          bw.addMutation(m);
+        }
+      }
+    } catch (MutationsRejectedException | TableNotFoundException e) {
       throw new RuntimeException(e);
     }
   }
@@ -195,21 +257,6 @@ public class ServerAmpleImpl extends AmpleImpl implements 
Ample {
     }
   }
 
-  private BatchWriter createWriter(TableId tableId) {
-
-    Preconditions.checkArgument(!RootTable.ID.equals(tableId));
-
-    try {
-      if (MetadataTable.ID.equals(tableId)) {
-        return context.createBatchWriter(RootTable.NAME);
-      } else {
-        return context.createBatchWriter(MetadataTable.NAME);
-      }
-    } catch (TableNotFoundException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   @Override
   public Mutation createDeleteMutation(ReferenceFile tabletFilePathToRemove) {
     return 
createDelMutation(ValidationUtil.validate(tabletFilePathToRemove).getMetadataEntry());
diff --git 
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
 
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 9cdeb1b2d0..08871bd97b 100644
--- 
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++ 
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -47,7 +47,6 @@ import org.apache.accumulo.core.client.AccumuloException;
 import org.apache.accumulo.core.client.AccumuloSecurityException;
 import org.apache.accumulo.core.client.BatchWriter;
 import org.apache.accumulo.core.client.BatchWriterConfig;
-import org.apache.accumulo.core.client.IsolatedScanner;
 import org.apache.accumulo.core.client.MutationsRejectedException;
 import org.apache.accumulo.core.client.Scanner;
 import org.apache.accumulo.core.client.TableNotFoundException;
@@ -62,7 +61,6 @@ import org.apache.accumulo.core.data.Range;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.core.gc.ReferenceFile;
 import org.apache.accumulo.core.metadata.MetadataTable;
@@ -73,9 +71,7 @@ import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.core.metadata.schema.Ample.TabletMutator;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
-import org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection;
 import org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection;
-import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ChoppedColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ClonedColumnFamily;
 import 
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.CurrentLocationColumnFamily;
@@ -666,50 +662,6 @@ public class MetadataTableUtil {
     tablet.mutate();
   }
 
-  public static void removeBulkLoadEntries(AccumuloClient client, TableId 
tableId, long tid)
-      throws Exception {
-    try (
-        Scanner mscanner =
-            new IsolatedScanner(client.createScanner(MetadataTable.NAME, 
Authorizations.EMPTY));
-        BatchWriter bw = client.createBatchWriter(MetadataTable.NAME)) {
-      mscanner.setRange(new KeyExtent(tableId, null, null).toMetaRange());
-      mscanner.fetchColumnFamily(BulkFileColumnFamily.NAME);
-
-      for (Entry<Key,Value> entry : mscanner) {
-        log.trace("Looking at entry {} with tid {}", entry, tid);
-        long entryTid = BulkFileColumnFamily.getBulkLoadTid(entry.getValue());
-        if (tid == entryTid) {
-          log.trace("deleting entry {}", entry);
-          Key key = entry.getKey();
-          Mutation m = new Mutation(key.getRow());
-          m.putDelete(key.getColumnFamily(), key.getColumnQualifier());
-          bw.addMutation(m);
-        }
-      }
-    }
-  }
-
-  public static void addBulkLoadInProgressFlag(ServerContext context, String 
path, long fateTxid) {
-
-    Mutation m = new Mutation(BlipSection.getRowPrefix() + path);
-    m.put(EMPTY_TEXT, EMPTY_TEXT, new Value(FateTxId.formatTid(fateTxid)));
-
-    // new KeyExtent is only added to force update to write to the metadata 
table, not the root
-    // table
-    // because bulk loads aren't supported to the metadata table
-    update(context, m, new KeyExtent(TableId.of("anythingNotMetadata"), null, 
null));
-  }
-
-  public static void removeBulkLoadInProgressFlag(ServerContext context, 
String path) {
-
-    Mutation m = new Mutation(BlipSection.getRowPrefix() + path);
-    m.putDelete(EMPTY_TEXT, EMPTY_TEXT);
-
-    // new KeyExtent is only added to force update to write to the metadata 
table, not the root
-    // table because bulk loads aren't supported to the metadata table
-    update(context, m, new KeyExtent(TableId.of("anythingNotMetadata"), null, 
null));
-  }
-
   public static SortedMap<Text,SortedMap<ColumnFQ,Value>>
       getTabletEntries(SortedMap<Key,Value> tabletKeyValues, List<ColumnFQ> 
columns) {
     TreeMap<Text,SortedMap<ColumnFQ,Value>> tabletEntries = new TreeMap<>();
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java
index 45d607e4cb..b07f7c79ab 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/BulkImport.java
@@ -47,7 +47,6 @@ import org.apache.accumulo.manager.tableOps.Utils;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.fs.VolumeManager;
 import org.apache.accumulo.server.tablets.UniqueNameAllocator;
-import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -199,7 +198,7 @@ public class BulkImport extends ManagerRepo {
       TableId tableId, long tid) throws Exception {
     final Path bulkDir = createNewBulkDir(manager, fs, dir, tableId);
 
-    MetadataTableUtil.addBulkLoadInProgressFlag(manager,
+    manager.getAmple().addBulkLoadInProgressFlag(
         "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), tid);
 
     Path dirPath = new Path(dir);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java
index f52291564a..df9564c805 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CleanUpBulkImport.java
@@ -21,16 +21,15 @@ package org.apache.accumulo.manager.tableOps.bulkVer1;
 import java.util.Collections;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.gc.ReferenceFile;
 import org.apache.accumulo.core.master.thrift.BulkImportState;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.manager.tableOps.Utils;
-import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -59,13 +58,13 @@ public class CleanUpBulkImport extends ManagerRepo {
     manager.updateBulkImportStatus(source, BulkImportState.CLEANUP);
     log.debug("removing the bulkDir processing flag file in " + bulk);
     Path bulkDir = new Path(bulk);
-    MetadataTableUtil.removeBulkLoadInProgressFlag(manager.getContext(),
+    Ample ample = manager.getContext().getAmple();
+    ample.removeBulkLoadInProgressFlag(
         "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
-    manager.getContext().getAmple().putGcFileAndDirCandidates(tableId,
+    ample.putGcFileAndDirCandidates(tableId,
         Collections.singleton(new ReferenceFile(tableId, bulkDir.toString())));
     log.debug("removing the metadata table markers for loaded files");
-    AccumuloClient client = manager.getContext();
-    MetadataTableUtil.removeBulkLoadEntries(client, tableId, tid);
+    ample.removeBulkLoadEntries(tableId, tid);
     log.debug("releasing HDFS reservations for " + source + " and " + error);
     Utils.unreserveHdfsDirectory(manager, source, tid);
     Utils.unreserveHdfsDirectory(manager, error, tid);
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
index ef19bc03ed..5ace9ccb6f 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/BulkImportMove.java
@@ -36,7 +36,6 @@ import org.apache.accumulo.core.master.thrift.BulkImportState;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.server.fs.VolumeManager;
-import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -103,9 +102,8 @@ class BulkImportMove extends ManagerRepo {
    */
   private void moveFiles(long tid, Path sourceDir, Path bulkDir, Manager 
manager,
       final VolumeManager fs, Map<String,String> renames) throws Exception {
-    MetadataTableUtil.addBulkLoadInProgressFlag(manager.getContext(),
+    manager.getContext().getAmple().addBulkLoadInProgressFlag(
         "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName(), tid);
-
     AccumuloConfiguration aConf = manager.getConfiguration();
     @SuppressWarnings("deprecation")
     int workerCount = aConf.getCount(
diff --git 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
index 064178ea94..dffe0671e5 100644
--- 
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
+++ 
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/CleanUpBulkImport.java
@@ -22,16 +22,15 @@ import java.io.IOException;
 import java.util.Collections;
 
 import org.apache.accumulo.core.Constants;
-import org.apache.accumulo.core.client.AccumuloClient;
 import org.apache.accumulo.core.fate.FateTxId;
 import org.apache.accumulo.core.fate.Repo;
 import org.apache.accumulo.core.gc.ReferenceFile;
 import org.apache.accumulo.core.manager.state.tables.TableState;
 import org.apache.accumulo.core.master.thrift.BulkImportState;
+import org.apache.accumulo.core.metadata.schema.Ample;
 import org.apache.accumulo.manager.Manager;
 import org.apache.accumulo.manager.tableOps.ManagerRepo;
 import org.apache.accumulo.manager.tableOps.Utils;
-import org.apache.accumulo.server.util.MetadataTableUtil;
 import org.apache.accumulo.server.zookeeper.TransactionWatcher.ZooArbitrator;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -53,15 +52,15 @@ public class CleanUpBulkImport extends ManagerRepo {
   public Repo<Manager> call(long tid, Manager manager) throws Exception {
     manager.updateBulkImportStatus(info.sourceDir, BulkImportState.CLEANUP);
     log.debug("removing the bulkDir processing flag file in " + info.bulkDir);
+    Ample ample = manager.getContext().getAmple();
     Path bulkDir = new Path(info.bulkDir);
-    MetadataTableUtil.removeBulkLoadInProgressFlag(manager.getContext(),
+    ample.removeBulkLoadInProgressFlag(
         "/" + bulkDir.getParent().getName() + "/" + bulkDir.getName());
-    manager.getContext().getAmple().putGcFileAndDirCandidates(info.tableId,
+    ample.putGcFileAndDirCandidates(info.tableId,
         Collections.singleton(new ReferenceFile(info.tableId, 
bulkDir.toString())));
     if (info.tableState == TableState.ONLINE) {
       log.debug("removing the metadata table markers for loaded files");
-      AccumuloClient client = manager.getContext();
-      MetadataTableUtil.removeBulkLoadEntries(client, info.tableId, tid);
+      ample.removeBulkLoadEntries(info.tableId, tid);
     }
     Utils.unreserveHdfsDirectory(manager, info.sourceDir, tid);
     Utils.getReadLock(manager, info.tableId, tid).unlock();

Reply via email to