http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 7cfc854..80950a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -77,6 +77,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsStatus; +import org.apache.hadoop.fs.FsTracer; import org.apache.hadoop.fs.HdfsBlockLocation; import org.apache.hadoop.fs.InvalidPathException; import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum; @@ -172,24 +173,19 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; -import org.apache.hadoop.tracing.SpanReceiverHost; -import org.apache.hadoop.tracing.TraceUtils; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; -import org.apache.htrace.Sampler; -import org.apache.htrace.SamplerBuilder; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.google.common.net.InetAddresses; +import org.apache.htrace.core.Tracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -212,6 +208,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour private final Configuration conf; + private final Tracer tracer; private final DfsClientConf dfsClientConf; final ClientProtocol namenode; /* The service used for delegation tokens */ @@ -238,7 +235,6 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private static final DFSHedgedReadMetrics HEDGED_READ_METRIC = new DFSHedgedReadMetrics(); private static ThreadPoolExecutor HEDGED_READ_THREAD_POOL; - private final Sampler<?> traceSampler; private final int smallBufferSize; public DfsClientConf getConf() { @@ -302,11 +298,8 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public DFSClient(URI nameNodeUri, ClientProtocol rpcNamenode, Configuration conf, FileSystem.Statistics stats) throws IOException { - SpanReceiverHost.get(conf, HdfsClientConfigKeys.DFS_CLIENT_HTRACE_PREFIX); - traceSampler = new SamplerBuilder(TraceUtils. - wrapHadoopConf(HdfsClientConfigKeys.DFS_CLIENT_HTRACE_PREFIX, conf)) - .build(); // Copy only the required DFSClient configuration + this.tracer = FsTracer.get(conf); this.dfsClientConf = new DfsClientConf(conf); this.conf = conf; this.stats = stats; @@ -628,7 +621,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#getPreferredBlockSize(String) */ public long getBlockSize(String f) throws IOException { - TraceScope scope = getPathTraceScope("getBlockSize", f); + TraceScope scope = newPathTraceScope("getBlockSize", f); try { return namenode.getPreferredBlockSize(f); } catch (IOException ie) { @@ -670,7 +663,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer) throws IOException { assert dtService != null; - TraceScope scope = Trace.startSpan("getDelegationToken", traceSampler); + TraceScope scope = tracer.newScope("getDelegationToken"); try { Token<DelegationTokenIdentifier> token = namenode.getDelegationToken(renewer); @@ -826,7 +819,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, @VisibleForTesting public LocatedBlocks getLocatedBlocks(String src, long start, long length) throws IOException { - TraceScope scope = getPathTraceScope("getBlockLocations", src); + TraceScope scope = newPathTraceScope("getBlockLocations", src); try { return callGetBlockLocations(namenode, src, start, length); } finally { @@ -858,7 +851,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, boolean recoverLease(String src) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("recoverLease", src); + TraceScope scope = newPathTraceScope("recoverLease", src); try { return namenode.recoverLease(src, clientName); } catch (RemoteException re) { @@ -884,7 +877,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public BlockLocation[] getBlockLocations(String src, long start, long length) throws IOException, UnresolvedLinkException { - TraceScope scope = getPathTraceScope("getBlockLocations", src); + TraceScope scope = newPathTraceScope("getBlockLocations", src); try { LocatedBlocks blocks = getLocatedBlocks(src, start, length); BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks); @@ -950,14 +943,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, // Make RPCs to the datanodes to get volume locations for its replicas TraceScope scope = - Trace.startSpan("getBlockStorageLocations", traceSampler); + tracer.newScope("getBlockStorageLocations"); Map<DatanodeInfo, HdfsBlocksMetadata> metadatas; try { metadatas = BlockStorageLocationUtil. queryDatanodesForHdfsBlocksMetadata(conf, datanodeBlocks, getConf().getFileBlockStorageLocationsNumThreads(), getConf().getFileBlockStorageLocationsTimeoutMs(), - getConf().isConnectToDnViaHostname()); + getConf().isConnectToDnViaHostname(), tracer, scope.getSpanId()); if (LOG.isTraceEnabled()) { LOG.trace("metadata returned: " + Joiner.on("\n").withKeyValueSeparator("=").join(metadatas)); @@ -983,7 +976,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ private KeyVersion decryptEncryptedDataEncryptionKey(FileEncryptionInfo feInfo) throws IOException { - TraceScope scope = Trace.startSpan("decryptEDEK", traceSampler); + TraceScope scope = tracer.newScope("decryptEDEK"); try { KeyProvider provider = getKeyProvider(); if (provider == null) { @@ -1139,7 +1132,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throws IOException, UnresolvedLinkException { checkOpen(); // Get block info from namenode - TraceScope scope = getPathTraceScope("newDFSInputStream", src); + TraceScope scope = newPathTraceScope("newDFSInputStream", src); try { return new DFSInputStream(this, src, verifyChecksum, null); } finally { @@ -1384,7 +1377,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void createSymlink(String target, String link, boolean createParent) throws IOException { - TraceScope scope = getPathTraceScope("createSymlink", target); + TraceScope scope = newPathTraceScope("createSymlink", target); try { final FsPermission dirPerm = applyUMask(null); namenode.createSymlink(target, link, dirPerm, createParent); @@ -1410,7 +1403,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public String getLinkTarget(String path) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("getLinkTarget", path); + TraceScope scope = newPathTraceScope("getLinkTarget", path); try { return namenode.getLinkTarget(path); } catch (RemoteException re) { @@ -1506,7 +1499,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public boolean setReplication(String src, short replication) throws IOException { - TraceScope scope = getPathTraceScope("setReplication", src); + TraceScope scope = newPathTraceScope("setReplication", src); try { return namenode.setReplication(src, replication); } catch(RemoteException re) { @@ -1529,7 +1522,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void setStoragePolicy(String src, String policyName) throws IOException { - TraceScope scope = getPathTraceScope("setStoragePolicy", src); + TraceScope scope = newPathTraceScope("setStoragePolicy", src); try { namenode.setStoragePolicy(src, policyName); } catch (RemoteException e) { @@ -1550,7 +1543,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public BlockStoragePolicy getStoragePolicy(String path) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("getStoragePolicy", path); + TraceScope scope = newPathTraceScope("getStoragePolicy", path); try { return namenode.getStoragePolicy(path); } catch (RemoteException e) { @@ -1567,7 +1560,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @return All the existing storage policies */ public BlockStoragePolicy[] getStoragePolicies() throws IOException { - TraceScope scope = Trace.startSpan("getStoragePolicies", traceSampler); + TraceScope scope = tracer.newScope("getStoragePolicies"); try { return namenode.getStoragePolicies(); } finally { @@ -1583,7 +1576,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, @Deprecated public boolean rename(String src, String dst) throws IOException { checkOpen(); - TraceScope scope = getSrcDstTraceScope("rename", src, dst); + TraceScope scope = newSrcDstTraceScope("rename", src, dst); try { return namenode.rename(src, dst); } catch(RemoteException re) { @@ -1604,7 +1597,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void concat(String trg, String [] srcs) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("concat", traceSampler); + TraceScope scope = tracer.newScope("concat"); try { namenode.concat(trg, srcs); } catch(RemoteException re) { @@ -1622,7 +1615,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void rename(String src, String dst, Options.Rename... options) throws IOException { checkOpen(); - TraceScope scope = getSrcDstTraceScope("rename2", src, dst); + TraceScope scope = newSrcDstTraceScope("rename2", src, dst); try { namenode.rename2(src, dst, options); } catch(RemoteException re) { @@ -1651,11 +1644,14 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throw new HadoopIllegalArgumentException( "Cannot truncate to a negative file size: " + newLength + "."); } + TraceScope scope = newPathTraceScope("truncate", src); try { return namenode.truncate(src, newLength, clientName); } catch (RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, UnresolvedPathException.class); + } finally { + scope.close(); } } @@ -1678,7 +1674,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public boolean delete(String src, boolean recursive) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("delete", src); + TraceScope scope = newPathTraceScope("delete", src); try { return namenode.delete(src, recursive); } catch(RemoteException re) { @@ -1720,7 +1716,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public DirectoryListing listPaths(String src, byte[] startAfter, boolean needLocation) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("listPaths", src); + TraceScope scope = newPathTraceScope("listPaths", src); try { return namenode.getListing(src, startAfter, needLocation); } catch(RemoteException re) { @@ -1742,7 +1738,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public HdfsFileStatus getFileInfo(String src) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("getFileInfo", src); + TraceScope scope = newPathTraceScope("getFileInfo", src); try { return namenode.getFileInfo(src); } catch(RemoteException re) { @@ -1760,7 +1756,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public boolean isFileClosed(String src) throws IOException{ checkOpen(); - TraceScope scope = getPathTraceScope("isFileClosed", src); + TraceScope scope = newPathTraceScope("isFileClosed", src); try { return namenode.isFileClosed(src); } catch(RemoteException re) { @@ -1782,7 +1778,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public HdfsFileStatus getFileLinkInfo(String src) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("getFileLinkInfo", src); + TraceScope scope = newPathTraceScope("getFileLinkInfo", src); try { return namenode.getFileLinkInfo(src); } catch(RemoteException re) { @@ -2085,7 +2081,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void setPermission(String src, FsPermission permission) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("setPermission", src); + TraceScope scope = newPathTraceScope("setPermission", src); try { namenode.setPermission(src, permission); } catch(RemoteException re) { @@ -2110,7 +2106,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void setOwner(String src, String username, String groupname) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("setOwner", src); + TraceScope scope = newPathTraceScope("setOwner", src); try { namenode.setOwner(src, username, groupname); } catch(RemoteException re) { @@ -2126,7 +2122,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, private long[] callGetStats() throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("getStats", traceSampler); + TraceScope scope = tracer.newScope("getStats"); try { return namenode.getStats(); } finally { @@ -2185,7 +2181,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, String cookie) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("listCorruptFileBlocks", path); + TraceScope scope = newPathTraceScope("listCorruptFileBlocks", path); try { return namenode.listCorruptFileBlocks(path, cookie); } finally { @@ -2196,7 +2192,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public DatanodeInfo[] datanodeReport(DatanodeReportType type) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("datanodeReport", traceSampler); + TraceScope scope = tracer.newScope("datanodeReport"); try { return namenode.getDatanodeReport(type); } finally { @@ -2208,7 +2204,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, DatanodeReportType type) throws IOException { checkOpen(); TraceScope scope = - Trace.startSpan("datanodeStorageReport", traceSampler); + tracer.newScope("datanodeStorageReport"); try { return namenode.getDatanodeStorageReport(type); } finally { @@ -2238,7 +2234,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public boolean setSafeMode(SafeModeAction action, boolean isChecked) throws IOException{ TraceScope scope = - Trace.startSpan("setSafeMode", traceSampler); + tracer.newScope("setSafeMode"); try { return namenode.setSafeMode(action, isChecked); } finally { @@ -2257,7 +2253,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public String createSnapshot(String snapshotRoot, String snapshotName) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("createSnapshot", traceSampler); + TraceScope scope = tracer.newScope("createSnapshot"); try { return namenode.createSnapshot(snapshotRoot, snapshotName); } catch(RemoteException re) { @@ -2279,7 +2275,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void deleteSnapshot(String snapshotRoot, String snapshotName) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("deleteSnapshot", traceSampler); + TraceScope scope = tracer.newScope("deleteSnapshot"); try { namenode.deleteSnapshot(snapshotRoot, snapshotName); } catch(RemoteException re) { @@ -2300,7 +2296,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void renameSnapshot(String snapshotDir, String snapshotOldName, String snapshotNewName) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("renameSnapshot", traceSampler); + TraceScope scope = tracer.newScope("renameSnapshot"); try { namenode.renameSnapshot(snapshotDir, snapshotOldName, snapshotNewName); } catch(RemoteException re) { @@ -2319,8 +2315,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("getSnapshottableDirListing", - traceSampler); + TraceScope scope = tracer.newScope("getSnapshottableDirListing"); try { return namenode.getSnapshottableDirListing(); } catch(RemoteException re) { @@ -2337,7 +2332,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void allowSnapshot(String snapshotRoot) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("allowSnapshot", traceSampler); + TraceScope scope = tracer.newScope("allowSnapshot"); try { namenode.allowSnapshot(snapshotRoot); } catch (RemoteException re) { @@ -2354,7 +2349,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void disallowSnapshot(String snapshotRoot) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("disallowSnapshot", traceSampler); + TraceScope scope = tracer.newScope("disallowSnapshot"); try { namenode.disallowSnapshot(snapshotRoot); } catch (RemoteException re) { @@ -2372,7 +2367,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public SnapshotDiffReport getSnapshotDiffReport(String snapshotDir, String fromSnapshot, String toSnapshot) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("getSnapshotDiffReport", traceSampler); + TraceScope scope = tracer.newScope("getSnapshotDiffReport"); try { return namenode.getSnapshotDiffReport(snapshotDir, fromSnapshot, toSnapshot); @@ -2386,7 +2381,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public long addCacheDirective( CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("addCacheDirective", traceSampler); + TraceScope scope = tracer.newScope("addCacheDirective"); try { return namenode.addCacheDirective(info, flags); } catch (RemoteException re) { @@ -2399,7 +2394,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void modifyCacheDirective( CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("modifyCacheDirective", traceSampler); + TraceScope scope = tracer.newScope("modifyCacheDirective"); try { namenode.modifyCacheDirective(info, flags); } catch (RemoteException re) { @@ -2412,7 +2407,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void removeCacheDirective(long id) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("removeCacheDirective", traceSampler); + TraceScope scope = tracer.newScope("removeCacheDirective"); try { namenode.removeCacheDirective(id); } catch (RemoteException re) { @@ -2424,12 +2419,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public RemoteIterator<CacheDirectiveEntry> listCacheDirectives( CacheDirectiveInfo filter) throws IOException { - return new CacheDirectiveIterator(namenode, filter, traceSampler); + return new CacheDirectiveIterator(namenode, filter, tracer); } public void addCachePool(CachePoolInfo info) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("addCachePool", traceSampler); + TraceScope scope = tracer.newScope("addCachePool"); try { namenode.addCachePool(info); } catch (RemoteException re) { @@ -2441,7 +2436,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void modifyCachePool(CachePoolInfo info) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("modifyCachePool", traceSampler); + TraceScope scope = tracer.newScope("modifyCachePool"); try { namenode.modifyCachePool(info); } catch (RemoteException re) { @@ -2453,7 +2448,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void removeCachePool(String poolName) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("removeCachePool", traceSampler); + TraceScope scope = tracer.newScope("removeCachePool"); try { namenode.removeCachePool(poolName); } catch (RemoteException re) { @@ -2464,7 +2459,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } public RemoteIterator<CachePoolEntry> listCachePools() throws IOException { - return new CachePoolIterator(namenode, traceSampler); + return new CachePoolIterator(namenode, tracer); } /** @@ -2473,7 +2468,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#saveNamespace() */ void saveNamespace() throws AccessControlException, IOException { - TraceScope scope = Trace.startSpan("saveNamespace", traceSampler); + TraceScope scope = tracer.newScope("saveNamespace"); try { namenode.saveNamespace(); } catch(RemoteException re) { @@ -2490,7 +2485,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#rollEdits() */ long rollEdits() throws AccessControlException, IOException { - TraceScope scope = Trace.startSpan("rollEdits", traceSampler); + TraceScope scope = tracer.newScope("rollEdits"); try { return namenode.rollEdits(); } catch(RemoteException re) { @@ -2512,7 +2507,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ boolean restoreFailedStorage(String arg) throws AccessControlException, IOException{ - TraceScope scope = Trace.startSpan("restoreFailedStorage", traceSampler); + TraceScope scope = tracer.newScope("restoreFailedStorage"); try { return namenode.restoreFailedStorage(arg); } finally { @@ -2528,7 +2523,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#refreshNodes() */ public void refreshNodes() throws IOException { - TraceScope scope = Trace.startSpan("refreshNodes", traceSampler); + TraceScope scope = tracer.newScope("refreshNodes"); try { namenode.refreshNodes(); } finally { @@ -2542,7 +2537,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#metaSave(String) */ public void metaSave(String pathname) throws IOException { - TraceScope scope = Trace.startSpan("metaSave", traceSampler); + TraceScope scope = tracer.newScope("metaSave"); try { namenode.metaSave(pathname); } finally { @@ -2559,7 +2554,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#setBalancerBandwidth(long) */ public void setBalancerBandwidth(long bandwidth) throws IOException { - TraceScope scope = Trace.startSpan("setBalancerBandwidth", traceSampler); + TraceScope scope = tracer.newScope("setBalancerBandwidth"); try { namenode.setBalancerBandwidth(bandwidth); } finally { @@ -2571,7 +2566,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#finalizeUpgrade() */ public void finalizeUpgrade() throws IOException { - TraceScope scope = Trace.startSpan("finalizeUpgrade", traceSampler); + TraceScope scope = tracer.newScope("finalizeUpgrade"); try { namenode.finalizeUpgrade(); } finally { @@ -2580,7 +2575,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action) throws IOException { - TraceScope scope = Trace.startSpan("rollingUpgrade", traceSampler); + TraceScope scope = tracer.newScope("rollingUpgrade"); try { return namenode.rollingUpgrade(action); } finally { @@ -2638,7 +2633,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, if(LOG.isDebugEnabled()) { LOG.debug(src + ": masked=" + absPermission); } - TraceScope scope = Trace.startSpan("mkdir", traceSampler); + TraceScope scope = tracer.newScope("mkdir"); try { return namenode.mkdirs(src, absPermission, createParent); } catch(RemoteException re) { @@ -2665,7 +2660,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, * @see ClientProtocol#getContentSummary(String) */ ContentSummary getContentSummary(String src) throws IOException { - TraceScope scope = getPathTraceScope("getContentSummary", src); + TraceScope scope = newPathTraceScope("getContentSummary", src); try { return namenode.getContentSummary(src); } catch(RemoteException re) { @@ -2693,7 +2688,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, storagespaceQuota); } - TraceScope scope = getPathTraceScope("setQuota", src); + TraceScope scope = newPathTraceScope("setQuota", src); try { // Pass null as storage type for traditional namespace/storagespace quota. namenode.setQuota(src, namespaceQuota, storagespaceQuota, null); @@ -2728,7 +2723,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, throw new IllegalArgumentException("Don't support Quota for storage type : " + type.toString()); } - TraceScope scope = getPathTraceScope("setQuotaByStorageType", src); + TraceScope scope = newPathTraceScope("setQuotaByStorageType", src); try { namenode.setQuota(src, HdfsConstants.QUOTA_DONT_SET, quota, type); } catch (RemoteException re) { @@ -2748,7 +2743,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, */ public void setTimes(String src, long mtime, long atime) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("setTimes", src); + TraceScope scope = newPathTraceScope("setTimes", src); try { namenode.setTimes(src, mtime, atime); } catch(RemoteException re) { @@ -2809,7 +2804,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void modifyAclEntries(String src, List<AclEntry> aclSpec) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("modifyAclEntries", src); + TraceScope scope = newPathTraceScope("modifyAclEntries", src); try { namenode.modifyAclEntries(src, aclSpec); } catch(RemoteException re) { @@ -2828,7 +2823,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void removeAclEntries(String src, List<AclEntry> aclSpec) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("removeAclEntries", traceSampler); + TraceScope scope = tracer.newScope("removeAclEntries"); try { namenode.removeAclEntries(src, aclSpec); } catch(RemoteException re) { @@ -2846,7 +2841,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void removeDefaultAcl(String src) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("removeDefaultAcl", traceSampler); + TraceScope scope = tracer.newScope("removeDefaultAcl"); try { namenode.removeDefaultAcl(src); } catch(RemoteException re) { @@ -2864,7 +2859,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void removeAcl(String src) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("removeAcl", traceSampler); + TraceScope scope = tracer.newScope("removeAcl"); try { namenode.removeAcl(src); } catch(RemoteException re) { @@ -2882,7 +2877,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void setAcl(String src, List<AclEntry> aclSpec) throws IOException { checkOpen(); - TraceScope scope = Trace.startSpan("setAcl", traceSampler); + TraceScope scope = tracer.newScope("setAcl"); try { namenode.setAcl(src, aclSpec); } catch(RemoteException re) { @@ -2900,7 +2895,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public AclStatus getAclStatus(String src) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("getAclStatus", src); + TraceScope scope = newPathTraceScope("getAclStatus", src); try { return namenode.getAclStatus(src); } catch(RemoteException re) { @@ -2916,7 +2911,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void createEncryptionZone(String src, String keyName) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("createEncryptionZone", src); + TraceScope scope = newPathTraceScope("createEncryptionZone", src); try { namenode.createEncryptionZone(src, keyName); } catch (RemoteException re) { @@ -2931,7 +2926,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public EncryptionZone getEZForPath(String src) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("getEZForPath", src); + TraceScope scope = newPathTraceScope("getEZForPath", src); try { return namenode.getEZForPath(src); } catch (RemoteException re) { @@ -2945,13 +2940,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public RemoteIterator<EncryptionZone> listEncryptionZones() throws IOException { checkOpen(); - return new EncryptionZoneIterator(namenode, traceSampler); + return new EncryptionZoneIterator(namenode, tracer); } public void setXAttr(String src, String name, byte[] value, EnumSet<XAttrSetFlag> flag) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("setXAttr", src); + TraceScope scope = newPathTraceScope("setXAttr", src); try { namenode.setXAttr(src, XAttrHelper.buildXAttr(name, value), flag); } catch (RemoteException re) { @@ -2968,7 +2963,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public byte[] getXAttr(String src, String name) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("getXAttr", src); + TraceScope scope = newPathTraceScope("getXAttr", src); try { final List<XAttr> xAttrs = XAttrHelper.buildXAttrAsList(name); final List<XAttr> result = namenode.getXAttrs(src, xAttrs); @@ -2984,7 +2979,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public Map<String, byte[]> getXAttrs(String src) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("getXAttrs", src); + TraceScope scope = newPathTraceScope("getXAttrs", src); try { return XAttrHelper.buildXAttrMap(namenode.getXAttrs(src, null)); } catch(RemoteException re) { @@ -2999,7 +2994,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public Map<String, byte[]> getXAttrs(String src, List<String> names) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("getXAttrs", src); + TraceScope scope = newPathTraceScope("getXAttrs", src); try { return XAttrHelper.buildXAttrMap(namenode.getXAttrs( src, XAttrHelper.buildXAttrs(names))); @@ -3015,7 +3010,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public List<String> listXAttrs(String src) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("listXAttrs", src); + TraceScope scope = newPathTraceScope("listXAttrs", src); try { final Map<String, byte[]> xattrs = XAttrHelper.buildXAttrMap(namenode.listXAttrs(src)); @@ -3031,7 +3026,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void removeXAttr(String src, String name) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("removeXAttr", src); + TraceScope scope = newPathTraceScope("removeXAttr", src); try { namenode.removeXAttr(src, XAttrHelper.buildXAttr(name)); } catch(RemoteException re) { @@ -3048,7 +3043,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, public void checkAccess(String src, FsAction mode) throws IOException { checkOpen(); - TraceScope scope = getPathTraceScope("checkAccess", src); + TraceScope scope = newPathTraceScope("checkAccess", src); try { namenode.checkAccess(src, mode); } catch (RemoteException re) { @@ -3061,12 +3056,13 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { - return new DFSInotifyEventInputStream(traceSampler, namenode); + return new DFSInotifyEventInputStream(namenode, tracer); } public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) throws IOException { - return new DFSInotifyEventInputStream(traceSampler, namenode, lastReadTxid); + return new DFSInotifyEventInputStream(namenode, tracer, + lastReadTxid); } @Override // RemotePeerFactory @@ -3176,28 +3172,26 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, return saslClient; } - TraceScope getPathTraceScope(String description, String path) { - TraceScope scope = Trace.startSpan(description, traceSampler); - Span span = scope.getSpan(); - if (span != null) { - if (path != null) { - span.addKVAnnotation("path", path); - } + TraceScope newPathTraceScope(String description, String path) { + TraceScope scope = tracer.newScope(description); + if (path != null) { + scope.addKVAnnotation("path", path); } return scope; } - TraceScope getSrcDstTraceScope(String description, String src, String dst) { - TraceScope scope = Trace.startSpan(description, traceSampler); - Span span = scope.getSpan(); - if (span != null) { - if (src != null) { - span.addKVAnnotation("src", src); - } - if (dst != null) { - span.addKVAnnotation("dst", dst); - } + TraceScope newSrcDstTraceScope(String description, String src, String dst) { + TraceScope scope = tracer.newScope(description); + if (src != null) { + scope.addKVAnnotation("src", src); + } + if (dst != null) { + scope.addKVAnnotation("dst", dst); } return scope; } + + Tracer getTracer() { + return tracer; + } }
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java index 11a1d29..c98cd5f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java @@ -26,9 +26,8 @@ import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.inotify.MissingEventsException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.util.Time; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,11 +46,6 @@ public class DFSInotifyEventInputStream { public static final Logger LOG = LoggerFactory.getLogger( DFSInotifyEventInputStream.class); - /** - * The trace sampler to use when making RPCs to the NameNode. - */ - private final Sampler<?> traceSampler; - private final ClientProtocol namenode; private Iterator<EventBatch> it; private long lastReadTxid; @@ -65,20 +59,22 @@ public class DFSInotifyEventInputStream { */ private Random rng = new Random(); + private final Tracer tracer; + private static final int INITIAL_WAIT_MS = 10; - DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode) + DFSInotifyEventInputStream(ClientProtocol namenode, Tracer tracer) throws IOException { // Only consider new transaction IDs. - this(traceSampler, namenode, namenode.getCurrentEditLogTxid()); + this(namenode, tracer, namenode.getCurrentEditLogTxid()); } - DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode, - long lastReadTxid) throws IOException { - this.traceSampler = traceSampler; + DFSInotifyEventInputStream(ClientProtocol namenode, + Tracer tracer, long lastReadTxid) throws IOException { this.namenode = namenode; this.it = Iterators.emptyIterator(); this.lastReadTxid = lastReadTxid; + this.tracer = tracer; } /** @@ -98,8 +94,7 @@ public class DFSInotifyEventInputStream { * The next available batch of events will be returned. */ public EventBatch poll() throws IOException, MissingEventsException { - TraceScope scope = - Trace.startSpan("inotifyPoll", traceSampler); + TraceScope scope = tracer.newScope("inotifyPoll"); try { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { @@ -180,7 +175,7 @@ public class DFSInotifyEventInputStream { */ public EventBatch poll(long time, TimeUnit tu) throws IOException, InterruptedException, MissingEventsException { - TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler); + TraceScope scope = tracer.newScope("inotifyPollWithTimeout"); EventBatch next = null; try { long initialTime = Time.monotonicNow(); @@ -217,7 +212,7 @@ public class DFSInotifyEventInputStream { */ public EventBatch take() throws IOException, InterruptedException, MissingEventsException { - TraceScope scope = Trace.startSpan("inotifyTake", traceSampler); + TraceScope scope = tracer.newScope("inotifyTake"); EventBatch next = null; try { int nextWaitMin = INITIAL_WAIT_MS; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java index 139a27c..7101753 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java @@ -55,6 +55,7 @@ import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.HasEnhancedByteBufferAccess; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.fs.StorageType; @@ -78,9 +79,9 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.IdentityHashStore; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import com.google.common.annotations.VisibleForTesting; @@ -678,6 +679,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, setClientCacheContext(dfsClient.getClientContext()). setUserGroupInformation(dfsClient.ugi). setConfiguration(dfsClient.getConfiguration()). + setTracer(dfsClient.getTracer()). build(); } @@ -941,7 +943,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, public synchronized int read(final byte buf[], int off, int len) throws IOException { ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf); TraceScope scope = - dfsClient.getPathTraceScope("DFSInputStream#byteArrayRead", src); + dfsClient.newPathTraceScope("DFSInputStream#byteArrayRead", src); try { return readWithStrategy(byteArrayReader, off, len); } finally { @@ -953,7 +955,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, public synchronized int read(final ByteBuffer buf) throws IOException { ReaderStrategy byteBufferReader = new ByteBufferStrategy(buf); TraceScope scope = - dfsClient.getPathTraceScope("DFSInputStream#byteBufferRead", src); + dfsClient.newPathTraceScope("DFSInputStream#byteBufferRead", src); try { return readWithStrategy(byteBufferReader, 0, buf.remaining()); } finally { @@ -1120,14 +1122,14 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, final ByteBuffer bb, final Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap, final int hedgedReadId) { - final Span parentSpan = Trace.currentSpan(); + final SpanId parentSpanId = Tracer.getCurrentSpanId(); return new Callable<ByteBuffer>() { @Override public ByteBuffer call() throws Exception { byte[] buf = bb.array(); int offset = bb.position(); - TraceScope scope = - Trace.startSpan("hedgedRead" + hedgedReadId, parentSpan); + TraceScope scope = dfsClient.getTracer(). + newScope("hedgedRead" + hedgedReadId, parentSpanId); try { actualGetFromOneDataNode(datanode, block, start, end, buf, offset, corruptedBlockMap); @@ -1449,8 +1451,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead, @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { - TraceScope scope = - dfsClient.getPathTraceScope("DFSInputStream#byteArrayPread", src); + TraceScope scope = dfsClient. + newPathTraceScope("DFSInputStream#byteArrayPread", src); try { return pread(position, buffer, offset, length); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 5466936..6872683 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -32,6 +32,7 @@ import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.FSOutputSummer; import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileEncryptionInfo; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.ParentNotDirectoryException; import org.apache.hadoop.fs.Syncable; import org.apache.hadoop.fs.permission.FsPermission; @@ -62,9 +63,7 @@ import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum.Type; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -227,7 +226,7 @@ public class DFSOutputStream extends FSOutputSummer short replication, long blockSize, Progressable progress, int buffersize, DataChecksum checksum, String[] favoredNodes) throws IOException { TraceScope scope = - dfsClient.getPathTraceScope("newStreamForCreate", src); + dfsClient.newPathTraceScope("newStreamForCreate", src); try { HdfsFileStatus stat = null; @@ -350,7 +349,7 @@ public class DFSOutputStream extends FSOutputSummer LocatedBlock lastBlock, HdfsFileStatus stat, DataChecksum checksum, String[] favoredNodes) throws IOException { TraceScope scope = - dfsClient.getPathTraceScope("newStreamForAppend", src); + dfsClient.newPathTraceScope("newStreamForAppend", src); try { final DFSOutputStream out = new DFSOutputStream(dfsClient, src, flags, progress, lastBlock, stat, checksum, favoredNodes); @@ -375,7 +374,7 @@ public class DFSOutputStream extends FSOutputSummer } protected TraceScope createWriteTraceScope() { - return dfsClient.getPathTraceScope("DFSOutputStream#write", src); + return dfsClient.newPathTraceScope("DFSOutputStream#write", src); } // @see FSOutputSummer#writeChunk() @@ -495,7 +494,7 @@ public class DFSOutputStream extends FSOutputSummer @Override public void hflush() throws IOException { TraceScope scope = - dfsClient.getPathTraceScope("hflush", src); + dfsClient.newPathTraceScope("hflush", src); try { flushOrSync(false, EnumSet.noneOf(SyncFlag.class)); } finally { @@ -506,7 +505,7 @@ public class DFSOutputStream extends FSOutputSummer @Override public void hsync() throws IOException { TraceScope scope = - dfsClient.getPathTraceScope("hsync", src); + dfsClient.newPathTraceScope("hsync", src); try { flushOrSync(true, EnumSet.noneOf(SyncFlag.class)); } finally { @@ -529,7 +528,7 @@ public class DFSOutputStream extends FSOutputSummer */ public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { TraceScope scope = - dfsClient.getPathTraceScope("hsync", src); + dfsClient.newPathTraceScope("hsync", src); try { flushOrSync(true, syncFlags); } finally { @@ -770,7 +769,7 @@ public class DFSOutputStream extends FSOutputSummer @Override public synchronized void close() throws IOException { TraceScope scope = - dfsClient.getPathTraceScope("DFSOutputStream#close", src); + dfsClient.newPathTraceScope("DFSOutputStream#close", src); try { closeImpl(); } finally { @@ -799,7 +798,7 @@ public class DFSOutputStream extends FSOutputSummer // get last block before destroying the streamer ExtendedBlock lastBlock = getStreamer().getBlock(); closeThreads(false); - TraceScope scope = Trace.startSpan("completeFile", Sampler.NEVER); + TraceScope scope = dfsClient.getTracer().newScope("completeFile"); try { completeFile(lastBlock); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java index 22055c3..9a8ca6f 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSPacket.java @@ -27,7 +27,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.util.ByteArrayManager; -import org.apache.htrace.Span; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TraceScope; /**************************************************************** * DFSPacket is used by DataStreamer and DFSOutputStream. @@ -38,7 +40,7 @@ import org.apache.htrace.Span; @InterfaceAudience.Private class DFSPacket { public static final long HEART_BEAT_SEQNO = -1L; - private static long[] EMPTY = new long[0]; + private static SpanId[] EMPTY = new SpanId[0]; private final long seqno; // sequence number of buffer in block private final long offsetInBlock; // offset in block private boolean syncBlock; // this packet forces the current block to disk @@ -65,9 +67,9 @@ class DFSPacket { private int checksumPos; private final int dataStart; private int dataPos; - private long[] traceParents = EMPTY; + private SpanId[] traceParents = EMPTY; private int traceParentsUsed; - private Span span; + private TraceScope scope; /** * Create a new packet. @@ -293,7 +295,10 @@ class DFSPacket { addTraceParent(span.getSpanId()); } - public void addTraceParent(long id) { + public void addTraceParent(SpanId id) { + if (!id.isValid()) { + return; + } if (traceParentsUsed == traceParents.length) { int newLength = (traceParents.length == 0) ? 8 : traceParents.length * 2; @@ -310,18 +315,18 @@ class DFSPacket { * * Protected by the DFSOutputStream dataQueue lock. */ - public long[] getTraceParents() { + public SpanId[] getTraceParents() { // Remove duplicates from the array. int len = traceParentsUsed; Arrays.sort(traceParents, 0, len); int i = 0, j = 0; - long prevVal = 0; // 0 is not a valid span id + SpanId prevVal = SpanId.INVALID; while (true) { if (i == len) { break; } - long val = traceParents[i]; - if (val != prevVal) { + SpanId val = traceParents[i]; + if (!val.equals(prevVal)) { traceParents[j] = val; j++; prevVal = val; @@ -335,11 +340,11 @@ class DFSPacket { return traceParents; } - public void setTraceSpan(Span span) { - this.span = span; + public void setTraceScope(TraceScope scope) { + this.scope = scope; } - public Span getTraceSpan() { - return span; + public TraceScope getTraceScope() { + return scope; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java index fb57825..6482966 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java @@ -41,6 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; import org.apache.hadoop.hdfs.client.impl.DfsClientConf; @@ -79,12 +80,11 @@ import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; import org.apache.hadoop.util.Time; -import org.apache.htrace.NullScope; -import org.apache.htrace.Sampler; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceInfo; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.Sampler; +import org.apache.htrace.core.Span; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -505,7 +505,7 @@ class DataStreamer extends Daemon { @Override public void run() { long lastPacket = Time.monotonicNow(); - TraceScope scope = NullScope.INSTANCE; + TraceScope scope = null; while (!streamerClosed && dfsClient.clientRunning) { // if the Responder encountered an error, shutdown Responder if (errorState.hasError() && response != null) { @@ -556,12 +556,11 @@ class DataStreamer extends Daemon { LOG.warn("Caught exception", e); } one = dataQueue.getFirst(); // regular data packet - long parents[] = one.getTraceParents(); + SpanId[] parents = one.getTraceParents(); if (parents.length > 0) { - scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); - // TODO: use setParents API once it's available from HTrace 3.2 - // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); - // scope.getSpan().setParents(parents); + scope = dfsClient.getTracer(). + newScope("dataStreamer", parents[0]); + scope.getSpan().setParents(parents); } } } @@ -612,12 +611,16 @@ class DataStreamer extends Daemon { } // send the packet - Span span = null; + SpanId spanId = SpanId.INVALID; synchronized (dataQueue) { // move packet from dataQueue to ackQueue if (!one.isHeartbeatPacket()) { - span = scope.detach(); - one.setTraceSpan(span); + if (scope != null) { + spanId = scope.getSpanId(); + scope.detach(); + one.setTraceScope(scope); + } + scope = null; dataQueue.removeFirst(); ackQueue.addLast(one); dataQueue.notifyAll(); @@ -630,7 +633,8 @@ class DataStreamer extends Daemon { } // write out data to remote datanode - TraceScope writeScope = Trace.startSpan("writeTo", span); + TraceScope writeScope = dfsClient.getTracer(). + newScope("DataStreamer#writeTo", spanId); try { one.writeTo(blockStream); blockStream.flush(); @@ -697,7 +701,10 @@ class DataStreamer extends Daemon { streamerClosed = true; } } finally { - scope.close(); + if (scope != null) { + scope.close(); + scope = null; + } } } closeInternal(); @@ -731,7 +738,8 @@ class DataStreamer extends Daemon { * @throws IOException */ void waitForAckedSeqno(long seqno) throws IOException { - TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER); + TraceScope scope = dfsClient.getTracer(). + newScope("waitForAckedSeqno"); try { if (LOG.isDebugEnabled()) { LOG.debug("Waiting for ack for: " + seqno); @@ -781,7 +789,7 @@ class DataStreamer extends Daemon { while (!streamerClosed && dataQueue.size() + ackQueue.size() > dfsClient.getConf().getWriteMaxPackets()) { if (firstWait) { - Span span = Trace.currentSpan(); + Span span = Tracer.getCurrentSpan(); if (span != null) { span.addTimelineAnnotation("dataQueue.wait"); } @@ -802,7 +810,7 @@ class DataStreamer extends Daemon { } } } finally { - Span span = Trace.currentSpan(); + Span span = Tracer.getCurrentSpan(); if ((span != null) && (!firstWait)) { span.addTimelineAnnotation("end.wait"); } @@ -934,7 +942,7 @@ class DataStreamer extends Daemon { setName("ResponseProcessor for block " + block); PipelineAck ack = new PipelineAck(); - TraceScope scope = NullScope.INSTANCE; + TraceScope scope = null; while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { // process responses from datanodes. try { @@ -1021,8 +1029,11 @@ class DataStreamer extends Daemon { block.setNumBytes(one.getLastByteOffsetBlock()); synchronized (dataQueue) { - scope = Trace.continueSpan(one.getTraceSpan()); - one.setTraceSpan(null); + scope = one.getTraceScope(); + if (scope != null) { + scope.reattach(); + one.setTraceScope(null); + } lastAckedSeqno = seqno; ackQueue.removeFirst(); dataQueue.notifyAll(); @@ -1043,7 +1054,10 @@ class DataStreamer extends Daemon { responderClosed = true; } } finally { + if (scope != null) { scope.close(); + } + scope = null; } } } @@ -1109,11 +1123,12 @@ class DataStreamer extends Daemon { // a client waiting on close() will be aware that the flush finished. synchronized (dataQueue) { DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet - Span span = endOfBlockPacket.getTraceSpan(); - if (span != null) { - // Close any trace span associated with this Packet - TraceScope scope = Trace.continueSpan(span); + // Close any trace span associated with this Packet + TraceScope scope = endOfBlockPacket.getTraceScope(); + if (scope != null) { + scope.reattach(); scope.close(); + endOfBlockPacket.setTraceScope(null); } assert endOfBlockPacket.isLastPacketInBlock(); assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; @@ -1741,7 +1756,7 @@ class DataStreamer extends Daemon { void queuePacket(DFSPacket packet) { synchronized (dataQueue) { if (packet == null) return; - packet.addTraceParent(Trace.currentSpan()); + packet.addTraceParent(Tracer.getCurrentSpanId()); dataQueue.addLast(packet); lastQueuedSeqno = packet.getSeqno(); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java index 7509da5..15a5bee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java @@ -47,9 +47,8 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,6 +105,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { int dataLeft = 0; private final PeerCache peerCache; + + private final Tracer tracer; /* FSInputChecker interface */ @@ -210,9 +211,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { protected synchronized int readChunk(long pos, byte[] buf, int offset, int len, byte[] checksumBuf) throws IOException { - TraceScope scope = - Trace.startSpan("RemoteBlockReader#readChunk(" + blockId + ")", - Sampler.NEVER); + TraceScope scope = tracer. + newScope("RemoteBlockReader#readChunk(" + blockId + ")"); try { return readChunkImpl(pos, buf, offset, len, checksumBuf); } finally { @@ -346,7 +346,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { private RemoteBlockReader(String file, String bpid, long blockId, DataInputStream in, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache) { + DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { // Path is used only for printing block and file information in debug super(new Path("/" + Block.BLOCK_FILE_PREFIX + blockId + ":" + bpid + ":of:"+ file)/*too non path-like?*/, @@ -378,6 +378,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); this.peerCache = peerCache; + this.tracer = tracer; } /** @@ -402,7 +403,8 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache, - CachingStrategy cachingStrategy) + CachingStrategy cachingStrategy, + Tracer tracer) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = @@ -438,7 +440,7 @@ public class RemoteBlockReader extends FSInputChecker implements BlockReader { return new RemoteBlockReader(file, block.getBlockPoolId(), block.getBlockId(), in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, - peer, datanodeID, peerCache); + peer, datanodeID, peerCache, tracer); } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 5541e6d..7a7932d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -48,12 +48,11 @@ import org.apache.hadoop.hdfs.shortcircuit.ClientMmap; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; import com.google.common.annotations.VisibleForTesting; +import org.apache.htrace.core.Tracer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,6 +125,8 @@ public class RemoteBlockReader2 implements BlockReader { private boolean sentStatusCode = false; + private final Tracer tracer; + @VisibleForTesting public Peer getPeer() { return peer; @@ -144,8 +145,8 @@ public class RemoteBlockReader2 implements BlockReader { } if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - TraceScope scope = Trace.startSpan( - "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); + TraceScope scope = tracer.newScope( + "RemoteBlockReader2#readNextPacket(" + blockId + ")"); try { readNextPacket(); } finally { @@ -172,8 +173,8 @@ public class RemoteBlockReader2 implements BlockReader { @Override public synchronized int read(ByteBuffer buf) throws IOException { if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) { - TraceScope scope = Trace.startSpan( - "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER); + TraceScope scope = tracer.newScope( + "RemoteBlockReader2#readNextPacket(" + blockId + ")"); try { readNextPacket(); } finally { @@ -292,7 +293,7 @@ public class RemoteBlockReader2 implements BlockReader { protected RemoteBlockReader2(String file, String bpid, long blockId, DataChecksum checksum, boolean verifyChecksum, long startOffset, long firstChunkOffset, long bytesToRead, Peer peer, - DatanodeID datanodeID, PeerCache peerCache) { + DatanodeID datanodeID, PeerCache peerCache, Tracer tracer) { this.isLocal = DFSUtilClient.isLocalAddress(NetUtils. createSocketAddr(datanodeID.getXferAddr())); // Path is used only for printing block and file information in debug @@ -313,6 +314,7 @@ public class RemoteBlockReader2 implements BlockReader { this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset); bytesPerChecksum = this.checksum.getBytesPerChecksum(); checksumSize = this.checksum.getChecksumSize(); + this.tracer = tracer; } @@ -407,7 +409,8 @@ public class RemoteBlockReader2 implements BlockReader { String clientName, Peer peer, DatanodeID datanodeID, PeerCache peerCache, - CachingStrategy cachingStrategy) throws IOException { + CachingStrategy cachingStrategy, + Tracer tracer) throws IOException { // in and out will be closed when sock is closed (by the caller) final DataOutputStream out = new DataOutputStream(new BufferedOutputStream( peer.getOutputStream())); @@ -440,7 +443,7 @@ public class RemoteBlockReader2 implements BlockReader { return new RemoteBlockReader2(file, block.getBlockPoolId(), block.getBlockId(), checksum, verifyChecksum, startOffset, firstChunkOffset, len, peer, - datanodeID, peerCache); + datanodeID, peerCache, tracer); } static void checkSuccess( http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java index 923cdb4..f144a55 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java @@ -25,11 +25,10 @@ import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator; import org.apache.hadoop.fs.InvalidRequestException; import org.apache.hadoop.ipc.RemoteException; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; import com.google.common.base.Preconditions; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; /** * CacheDirectiveIterator is a remote iterator that iterates cache directives. @@ -42,14 +41,14 @@ public class CacheDirectiveIterator private CacheDirectiveInfo filter; private final ClientProtocol namenode; - private final Sampler<?> traceSampler; + private final Tracer tracer; public CacheDirectiveIterator(ClientProtocol namenode, - CacheDirectiveInfo filter, Sampler<?> traceSampler) { + CacheDirectiveInfo filter, Tracer tracer) { super(0L); this.namenode = namenode; this.filter = filter; - this.traceSampler = traceSampler; + this.tracer = tracer; } private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) { @@ -94,7 +93,7 @@ public class CacheDirectiveIterator public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey) throws IOException { BatchedEntries<CacheDirectiveEntry> entries = null; - TraceScope scope = Trace.startSpan("listCacheDirectives", traceSampler); + TraceScope scope = tracer.newScope("listCacheDirectives"); try { entries = namenode.listCacheDirectives(prevKey, filter); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java index e9481f7..5e2bbf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java @@ -23,9 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; /** * CachePoolIterator is a remote iterator that iterates cache pools. @@ -37,18 +36,18 @@ public class CachePoolIterator extends BatchedRemoteIterator<String, CachePoolEntry> { private final ClientProtocol namenode; - private final Sampler traceSampler; + private final Tracer tracer; - public CachePoolIterator(ClientProtocol namenode, Sampler traceSampler) { + public CachePoolIterator(ClientProtocol namenode, Tracer tracer) { super(""); this.namenode = namenode; - this.traceSampler = traceSampler; + this.tracer = tracer; } @Override public BatchedEntries<CachePoolEntry> makeRequest(String prevKey) throws IOException { - TraceScope scope = Trace.startSpan("listCachePools", traceSampler); + TraceScope scope = tracer.newScope("listCachePools"); try { return namenode.listCachePools(prevKey); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java index 0141215..a3cff82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java @@ -23,9 +23,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.BatchedRemoteIterator; -import org.apache.htrace.Sampler; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; /** * EncryptionZoneIterator is a remote iterator that iterates over encryption @@ -37,19 +36,18 @@ public class EncryptionZoneIterator extends BatchedRemoteIterator<Long, EncryptionZone> { private final ClientProtocol namenode; - private final Sampler<?> traceSampler; + private final Tracer tracer; - public EncryptionZoneIterator(ClientProtocol namenode, - Sampler<?> traceSampler) { + public EncryptionZoneIterator(ClientProtocol namenode, Tracer tracer) { super(Long.valueOf(0)); this.namenode = namenode; - this.traceSampler = traceSampler; + this.tracer = tracer; } @Override public BatchedEntries<EncryptionZone> makeRequest(Long prevId) throws IOException { - TraceScope scope = Trace.startSpan("listEncryptionZones", traceSampler); + TraceScope scope = tracer.newScope("listEncryptionZones"); try { return namenode.listEncryptionZones(prevId); } finally { http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java index e2e5f39..e585328 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java @@ -35,10 +35,8 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceInfo; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.Tracer; /** * Static utilities for dealing with the protocol buffers used by the @@ -89,39 +87,21 @@ public abstract class DataTransferProtoUtil { BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() .setBlock(PBHelperClient.convert(blk)) .setToken(PBHelperClient.convert(blockToken)); - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); + SpanId spanId = Tracer.getCurrentSpanId(); + if (spanId.isValid()) { builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() - .setTraceId(s.getTraceId()) - .setParentId(s.getSpanId())); + .setTraceId(spanId.getHigh()) + .setParentId(spanId.getLow())); } return builder.build(); } - public static TraceInfo fromProto(DataTransferTraceInfoProto proto) { - if (proto == null) return null; - if (!proto.hasTraceId()) return null; - return new TraceInfo(proto.getTraceId(), proto.getParentId()); - } - - public static TraceScope continueTraceSpan(ClientOperationHeaderProto header, - String description) { - return continueTraceSpan(header.getBaseHeader(), description); - } - - public static TraceScope continueTraceSpan(BaseHeaderProto header, - String description) { - return continueTraceSpan(header.getTraceInfo(), description); - } - - public static TraceScope continueTraceSpan(DataTransferTraceInfoProto proto, - String description) { - TraceScope scope = null; - TraceInfo info = fromProto(proto); - if (info != null) { - scope = Trace.startSpan(description, info); + public static SpanId fromProto(DataTransferTraceInfoProto proto) { + if ((proto != null) && proto.hasTraceId() && + proto.hasParentId()) { + return new SpanId(proto.getTraceId(), proto.getParentId()); } - return scope; + return null; } public static void checkBlockOpStatus( http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index 2d11dc2..e856211 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -48,8 +48,8 @@ import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; -import org.apache.htrace.Trace; -import org.apache.htrace.Span; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.Tracer; import com.google.protobuf.Message; @@ -200,10 +200,11 @@ public class Sender implements DataTransferProtocol { ReleaseShortCircuitAccessRequestProto.Builder builder = ReleaseShortCircuitAccessRequestProto.newBuilder(). setSlotId(PBHelperClient.convert(slotId)); - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() - .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); + SpanId spanId = Tracer.getCurrentSpanId(); + if (spanId.isValid()) { + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder(). + setTraceId(spanId.getHigh()). + setParentId(spanId.getLow())); } ReleaseShortCircuitAccessRequestProto proto = builder.build(); send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto); @@ -214,10 +215,11 @@ public class Sender implements DataTransferProtocol { ShortCircuitShmRequestProto.Builder builder = ShortCircuitShmRequestProto.newBuilder(). setClientName(clientName); - if (Trace.isTracing()) { - Span s = Trace.currentSpan(); - builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() - .setTraceId(s.getTraceId()).setParentId(s.getSpanId())); + SpanId spanId = Tracer.getCurrentSpanId(); + if (spanId.isValid()) { + builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder(). + setTraceId(spanId.getHigh()). + setParentId(spanId.getLow())); } ShortCircuitShmRequestProto proto = builder.build(); send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 9f44e77..2f274a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -638,6 +638,8 @@ Release 2.8.0 - UNRELEASED HDFS-8740. Move DistributedFileSystem to hadoop-hdfs-client. (Mingliang Liu via wheat9) + HDFS-9080. Update htrace version to 4.0.1 (cmccabe) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 4c2ded5..0ca878c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -192,7 +192,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> </dependency> <dependency> <groupId>org.apache.htrace</groupId> - <artifactId>htrace-core</artifactId> + <artifactId>htrace-core4</artifactId> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 0ba174f..416b9e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -64,12 +64,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT = HdfsClientConfigKeys.DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT; - // HDFS HTrace configuration is controlled by dfs.htrace.spanreceiver.classes, - // etc. - public static final String DFS_SERVER_HTRACE_PREFIX = "dfs.htrace."; - @Deprecated - public static final String DFS_CLIENT_HTRACE_PREFIX = - HdfsClientConfigKeys.DFS_CLIENT_HTRACE_PREFIX; + public static final String DFS_CLIENT_HTRACE_SAMPLER_CLASSES = + "dfs.client.htrace.sampler.classes"; // HA related configuration public static final String DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY = "dfs.datanode.restart.replica.expiration"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b9d3262/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java index 59cf884..e040157 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs.protocol.datatransfer; import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.fromProto; -import static org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil.continueTraceSpan; import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed; import java.io.DataInputStream; @@ -27,7 +26,10 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferTraceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; @@ -40,14 +42,21 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmR import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId; -import org.apache.htrace.TraceScope; +import org.apache.htrace.core.SpanId; +import org.apache.htrace.core.TraceScope; +import org.apache.htrace.core.Tracer; /** Receiver */ @InterfaceAudience.Private @InterfaceStability.Evolving public abstract class Receiver implements DataTransferProtocol { + private final Tracer tracer; protected DataInputStream in; - + + protected Receiver(Tracer tracer) { + this.tracer = tracer; + } + /** Initialize a receiver for DataTransferProtocol with a socket. */ protected void initialize(final DataInputStream in) { this.in = in; @@ -64,6 +73,26 @@ public abstract class Receiver implements DataTransferProtocol { return Op.read(in); } + private TraceScope continueTraceSpan(DataTransferTraceInfoProto proto, + String description) { + TraceScope scope = null; + SpanId spanId = fromProto(proto); + if (spanId != null) { + scope = tracer.newScope(description, spanId); + } + return scope; + } + + private TraceScope continueTraceSpan(ClientOperationHeaderProto header, + String description) { + return continueTraceSpan(header.getBaseHeader(), description); + } + + private TraceScope continueTraceSpan(BaseHeaderProto header, + String description) { + return continueTraceSpan(header.getTraceInfo(), description); + } + /** Process op by the corresponding method. */ protected final void processOp(Op op) throws IOException { switch(op) {
