http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/RollingUpgradeInfo.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/RollingUpgradeInfo.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/RollingUpgradeInfo.java index b527797..b7a7e98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/RollingUpgradeInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/RollingUpgradeInfo.java @@ -64,7 +64,6 @@ public class RollingUpgradeInfo extends RollingUpgradeStatus { /** * Finalize the upgrade if not already finalized - * @param finalizeTime */ public void finalize(long finalizeTime) { if (finalizeTime != 0) { @@ -99,8 +98,11 @@ public class RollingUpgradeInfo extends RollingUpgradeStatus { @Override public String toString() { return super.toString() - + "\n Start Time: " + (startTime == 0? "<NOT STARTED>": timestamp2String(startTime)) - + "\n Finalize Time: " + (finalizeTime == 0? "<NOT FINALIZED>": timestamp2String(finalizeTime)); + + "\n Start Time: " + + (startTime == 0 ? "<NOT STARTED>" : timestamp2String(startTime)) + + "\n Finalize Time: " + + (finalizeTime == 0 ? "<NOT FINALIZED>" : + timestamp2String(finalizeTime)); } private static String timestamp2String(long timestamp) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java index b58ed36..be3b94d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshotDiffReport.java @@ -27,7 +27,7 @@ import com.google.common.base.Objects; import org.apache.hadoop.hdfs.DFSUtilClient; /** - * This class represents to end users the difference between two snapshots of + * This class represents to end users the difference between two snapshots of * the same directory, or the difference between a snapshot of the directory and * its current state. Instead of capturing all the details of the diff, this * class only lists where the changes happened and their types. @@ -42,21 +42,21 @@ public class SnapshotDiffReport { * DELETE, and RENAME respectively. */ public enum DiffType { - CREATE("+"), - MODIFY("M"), - DELETE("-"), + CREATE("+"), + MODIFY("M"), + DELETE("-"), RENAME("R"); - + private final String label; - - private DiffType(String label) { + + DiffType(String label) { this.label = label; } - + public String getLabel() { return label; } - + public static DiffType getTypeFromLabel(String label) { if (label.equals(CREATE.getLabel())) { return CREATE; @@ -69,8 +69,8 @@ public class SnapshotDiffReport { } return null; } - }; - + } + /** * Representing the full path and diff type of a file/directory where changes * have happened. @@ -98,7 +98,7 @@ public class SnapshotDiffReport { this.sourcePath = sourcePath; this.targetPath = targetPath; } - + public DiffReportEntry(DiffType type, byte[][] sourcePathComponents, byte[][] targetPathComponents) { this.type = type; @@ -106,7 +106,7 @@ public class SnapshotDiffReport { this.targetPath = targetPathComponents == null ? null : DFSUtilClient .byteArray2bytes(targetPathComponents); } - + @Override public String toString() { String str = type.getLabel() + "\t" + getPathString(sourcePath); @@ -115,7 +115,7 @@ public class SnapshotDiffReport { } return str; } - + public DiffType getType() { return type; } @@ -141,7 +141,7 @@ public class SnapshotDiffReport { public boolean equals(Object other) { if (this == other) { return true; - } + } if (other != null && other instanceof DiffReportEntry) { DiffReportEntry entry = (DiffReportEntry) other; return type.equals(entry.getType()) @@ -150,25 +150,25 @@ public class SnapshotDiffReport { } return false; } - + @Override public int hashCode() { return Objects.hashCode(getSourcePath(), getTargetPath()); } } - + /** snapshot root full path */ private final String snapshotRoot; /** start point of the diff */ private final String fromSnapshot; - + /** end point of the diff */ private final String toSnapshot; - + /** list of diff */ private final List<DiffReportEntry> diffList; - + public SnapshotDiffReport(String snapshotRoot, String fromSnapshot, String toSnapshot, List<DiffReportEntry> entryList) { this.snapshotRoot = snapshotRoot; @@ -177,7 +177,7 @@ public class SnapshotDiffReport { this.diffList = entryList != null ? entryList : Collections .<DiffReportEntry> emptyList(); } - + /** @return {@link #snapshotRoot}*/ public String getSnapshotRoot() { return snapshotRoot; @@ -192,23 +192,24 @@ public class SnapshotDiffReport { public String getLaterSnapshotName() { return toSnapshot; } - + /** @return {@link #diffList} */ public List<DiffReportEntry> getDiffList() { return diffList; } - + @Override public String toString() { StringBuilder str = new StringBuilder(); - String from = fromSnapshot == null || fromSnapshot.isEmpty() ? + String from = fromSnapshot == null || fromSnapshot.isEmpty() ? "current directory" : "snapshot " + fromSnapshot; String to = toSnapshot == null || toSnapshot.isEmpty() ? "current directory" : "snapshot " + toSnapshot; - str.append("Difference between " + from + " and " + to - + " under directory " + snapshotRoot + ":" + LINE_SEPARATOR); + str.append("Difference between ").append(from).append(" and ").append(to) + .append(" under directory ").append(snapshotRoot).append(":") + .append(LINE_SEPARATOR); for (DiffReportEntry entry : diffList) { - str.append(entry.toString() + LINE_SEPARATOR); + str.append(entry.toString()).append(LINE_SEPARATOR); } return str.toString(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java index ac19d44..914b45c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/SnapshottableDirectoryStatus.java @@ -31,30 +31,31 @@ import org.apache.hadoop.hdfs.DFSUtilClient; */ public class SnapshottableDirectoryStatus { /** Compare the statuses by full paths. */ - public static final Comparator<SnapshottableDirectoryStatus> COMPARATOR - = new Comparator<SnapshottableDirectoryStatus>() { - @Override - public int compare(SnapshottableDirectoryStatus left, - SnapshottableDirectoryStatus right) { - int d = DFSUtilClient.compareBytes(left.parentFullPath, right.parentFullPath); - return d != 0? d - : DFSUtilClient.compareBytes(left.dirStatus.getLocalNameInBytes(), + public static final Comparator<SnapshottableDirectoryStatus> COMPARATOR = + new Comparator<SnapshottableDirectoryStatus>() { + @Override + public int compare(SnapshottableDirectoryStatus left, + SnapshottableDirectoryStatus right) { + int d = DFSUtilClient.compareBytes(left.parentFullPath, + right.parentFullPath); + return d != 0? d + : DFSUtilClient.compareBytes(left.dirStatus.getLocalNameInBytes(), right.dirStatus.getLocalNameInBytes()); - } - }; + } + }; /** Basic information of the snapshottable directory */ private final HdfsFileStatus dirStatus; - + /** Number of snapshots that have been taken*/ private final int snapshotNumber; - + /** Number of snapshots allowed. */ private final int snapshotQuota; - + /** Full path of the parent. */ private final byte[] parentFullPath; - + public SnapshottableDirectoryStatus(long modification_time, long access_time, FsPermission permission, String owner, String group, byte[] localName, long inodeId, int childrenNum, @@ -80,7 +81,7 @@ public class SnapshottableDirectoryStatus { public int getSnapshotQuota() { return snapshotQuota; } - + /** * @return Full path of the parent */ @@ -94,13 +95,13 @@ public class SnapshottableDirectoryStatus { public HdfsFileStatus getDirStatus() { return dirStatus; } - + /** * @return Full path of the file */ public Path getFullPath() { - String parentFullPathStr = - (parentFullPath == null || parentFullPath.length == 0) ? + String parentFullPathStr = + (parentFullPath == null || parentFullPath.length == 0) ? null : DFSUtilClient.bytes2String(parentFullPath); if (parentFullPathStr == null && dirStatus.getLocalNameInBytes().length == 0) { @@ -111,13 +112,13 @@ public class SnapshottableDirectoryStatus { : new Path(parentFullPathStr, dirStatus.getLocalName()); } } - + /** * Print a list of {@link SnapshottableDirectoryStatus} out to a given stream. * @param stats The list of {@link SnapshottableDirectoryStatus} * @param out The given stream for printing. */ - public static void print(SnapshottableDirectoryStatus[] stats, + public static void print(SnapshottableDirectoryStatus[] stats, PrintStream out) { if (stats == null || stats.length == 0) { out.println(); @@ -133,30 +134,28 @@ public class SnapshottableDirectoryStatus { maxSnapshotNum = maxLength(maxSnapshotNum, status.snapshotNumber); maxSnapshotQuota = maxLength(maxSnapshotQuota, status.snapshotQuota); } - - StringBuilder fmt = new StringBuilder(); - fmt.append("%s%s "); // permission string - fmt.append("%" + maxRepl + "s "); - fmt.append((maxOwner > 0) ? "%-" + maxOwner + "s " : "%s"); - fmt.append((maxGroup > 0) ? "%-" + maxGroup + "s " : "%s"); - fmt.append("%" + maxLen + "s "); - fmt.append("%s "); // mod time - fmt.append("%" + maxSnapshotNum + "s "); - fmt.append("%" + maxSnapshotQuota + "s "); - fmt.append("%s"); // path - - String lineFormat = fmt.toString(); + + String lineFormat = "%s%s " // permission string + + "%" + maxRepl + "s " + + (maxOwner > 0 ? "%-" + maxOwner + "s " : "%s") + + (maxGroup > 0 ? "%-" + maxGroup + "s " : "%s") + + "%" + maxLen + "s " + + "%s " // mod time + + "%" + maxSnapshotNum + "s " + + "%" + maxSnapshotQuota + "s " + + "%s"; // path + SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm"); - + for (SnapshottableDirectoryStatus status : stats) { - String line = String.format(lineFormat, "d", + String line = String.format(lineFormat, "d", status.dirStatus.getPermission(), status.dirStatus.getReplication(), status.dirStatus.getOwner(), status.dirStatus.getGroup(), String.valueOf(status.dirStatus.getLen()), dateFormat.format(new Date(status.dirStatus.getModificationTime())), - status.snapshotNumber, status.snapshotQuota, + status.snapshotNumber, status.snapshotQuota, status.getFullPath().toString() ); out.println(line); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java index 03fb704..fc1d3d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java @@ -18,14 +18,12 @@ package org.apache.hadoop.hdfs.protocol; -import java.io.IOException; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.UnresolvedLinkException; import org.apache.hadoop.fs.Path; -/** +/** * Thrown when a symbolic link is encountered in a path. */ @InterfaceAudience.Private @@ -43,7 +41,7 @@ public final class UnresolvedPathException extends UnresolvedLinkException { public UnresolvedPathException(String msg) { super(msg); } - + public UnresolvedPathException(String path, String preceding, String remainder, String linkTarget) { this.path = path; @@ -55,7 +53,7 @@ public final class UnresolvedPathException extends UnresolvedLinkException { /** * Return a path with the link resolved with the target. */ - public Path getResolvedPath() throws IOException { + public Path getResolvedPath() { // If the path is absolute we cam throw out the preceding part and // just append the remainder to the target, otherwise append each // piece to resolve the link in path. @@ -76,12 +74,6 @@ public final class UnresolvedPathException extends UnresolvedLinkException { if (msg != null) { return msg; } - String myMsg = "Unresolved path " + path; - try { - return getResolvedPath().toString(); - } catch (IOException e) { - // Ignore - } - return myMsg; + return getResolvedPath().toString(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java index 5f86e52..36f5fd9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/BlockConstructionStage.java @@ -25,7 +25,7 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Evolving public enum BlockConstructionStage { /** The enumerates are always listed as regular stage followed by the - * recovery stage. + * recovery stage. * Changing this order will make getRecoveryStage not working. */ // pipeline set up for block append @@ -46,9 +46,9 @@ public enum BlockConstructionStage { TRANSFER_RBW, // transfer Finalized for adding datanodes TRANSFER_FINALIZED; - + final static private byte RECOVERY_BIT = (byte)1; - + /** * get the recovery stage of this stage */ @@ -59,4 +59,4 @@ public enum BlockConstructionStage { return values()[ordinal()|RECOVERY_BIT]; } } -} +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java index e585328..6801149 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java @@ -65,7 +65,9 @@ public abstract class DataTransferProtoUtil { } public static DataChecksum fromProto(ChecksumProto proto) { - if (proto == null) return null; + if (proto == null) { + return null; + } int bytesPerChecksum = proto.getBytesPerChecksum(); DataChecksum.Type type = PBHelperClient.convert(proto.getType()); @@ -74,19 +76,17 @@ public abstract class DataTransferProtoUtil { static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk, String client, Token<BlockTokenIdentifier> blockToken) { - ClientOperationHeaderProto header = - ClientOperationHeaderProto.newBuilder() - .setBaseHeader(buildBaseHeader(blk, blockToken)) - .setClientName(client) - .build(); - return header; + return ClientOperationHeaderProto.newBuilder() + .setBaseHeader(buildBaseHeader(blk, blockToken)) + .setClientName(client) + .build(); } static BaseHeaderProto buildBaseHeader(ExtendedBlock blk, Token<BlockTokenIdentifier> blockToken) { BaseHeaderProto.Builder builder = BaseHeaderProto.newBuilder() - .setBlock(PBHelperClient.convert(blk)) - .setToken(PBHelperClient.convert(blockToken)); + .setBlock(PBHelperClient.convert(blk)) + .setToken(PBHelperClient.convert(blockToken)); SpanId spanId = Tracer.getCurrentSpanId(); if (spanId.isValid()) { builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder() http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java index 1f7e378..4aa545b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java @@ -39,21 +39,21 @@ import org.slf4j.LoggerFactory; @InterfaceAudience.Private @InterfaceStability.Evolving public interface DataTransferProtocol { - public static final Logger LOG = LoggerFactory.getLogger(DataTransferProtocol.class); - + Logger LOG = LoggerFactory.getLogger(DataTransferProtocol.class); + /** Version for data transfers between clients and datanodes * This should change when serialization of DatanodeInfo, not just - * when protocol changes. It is not very obvious. + * when protocol changes. It is not very obvious. */ /* * Version 28: * Declare methods in DataTransferProtocol interface. */ - public static final int DATA_TRANSFER_VERSION = 28; + int DATA_TRANSFER_VERSION = 28; - /** + /** * Read a block. - * + * * @param blk the block being read. * @param blockToken security token for accessing the block. * @param clientName client's name. @@ -63,7 +63,7 @@ public interface DataTransferProtocol { * checksums * @param cachingStrategy The caching strategy to use. */ - public void readBlock(final ExtendedBlock blk, + void readBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final long blockOffset, @@ -77,7 +77,7 @@ public interface DataTransferProtocol { * The other downstream datanodes are specified by the targets parameter. * Note that the receiver {@link DatanodeInfo} is not required in the * parameter list since the receiver datanode knows its info. However, the - * {@link StorageType} for storing the replica in the receiver datanode is a + * {@link StorageType} for storing the replica in the receiver datanode is a * parameter since the receiver datanode may support multiple storage types. * * @param blk the block being written. @@ -96,12 +96,12 @@ public interface DataTransferProtocol { * @param pinning whether to pin the block, so Balancer won't move it. * @param targetPinnings whether to pin the block on target datanode */ - public void writeBlock(final ExtendedBlock blk, - final StorageType storageType, + void writeBlock(final ExtendedBlock blk, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -118,13 +118,13 @@ public interface DataTransferProtocol { * The block stage must be * either {@link BlockConstructionStage#TRANSFER_RBW} * or {@link BlockConstructionStage#TRANSFER_FINALIZED}. - * + * * @param blk the block being transferred. * @param blockToken security token for accessing the block. * @param clientName client's name. * @param targets target datanodes. */ - public void transferBlock(final ExtendedBlock blk, + void transferBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, @@ -135,14 +135,14 @@ public interface DataTransferProtocol { * * @param blk The block to get file descriptors for. * @param blockToken Security token for accessing the block. - * @param slotId The shared memory slot id to use, or null + * @param slotId The shared memory slot id to use, or null * to use no slot id. - * @param maxVersion Maximum version of the block data the client + * @param maxVersion Maximum version of the block data the client * can understand. * @param supportsReceiptVerification True if the client supports * receipt verification. */ - public void requestShortCircuitFds(final ExtendedBlock blk, + void requestShortCircuitFds(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, SlotId slotId, int maxVersion, boolean supportsReceiptVerification) throws IOException; @@ -152,51 +152,51 @@ public interface DataTransferProtocol { * * @param slotId SlotID used by the earlier file descriptors. */ - public void releaseShortCircuitFds(final SlotId slotId) throws IOException; + void releaseShortCircuitFds(final SlotId slotId) throws IOException; /** * Request a short circuit shared memory area from a DataNode. - * + * * @param clientName The name of the client. */ - public void requestShortCircuitShm(String clientName) throws IOException; - + void requestShortCircuitShm(String clientName) throws IOException; + /** * Receive a block from a source datanode * and then notifies the namenode * to remove the copy from the original datanode. * Note that the source datanode and the original datanode can be different. * It is used for balancing purpose. - * + * * @param blk the block being replaced. * @param storageType the {@link StorageType} for storing the block. * @param blockToken security token for accessing the block. * @param delHint the hint for deleting the block in the original datanode. * @param source the source datanode for receiving the block. */ - public void replaceBlock(final ExtendedBlock blk, - final StorageType storageType, + void replaceBlock(final ExtendedBlock blk, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo source) throws IOException; /** - * Copy a block. + * Copy a block. * It is used for balancing purpose. - * + * * @param blk the block being copied. * @param blockToken security token for accessing the block. */ - public void copyBlock(final ExtendedBlock blk, + void copyBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken) throws IOException; /** * Get block checksum (MD5 of CRC32). - * + * * @param blk a block. * @param blockToken security token for accessing the block. * @throws IOException */ - public void blockChecksum(final ExtendedBlock blk, + void blockChecksum(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java index 23407f8..4157a30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/IOStreamPair.java @@ -29,9 +29,9 @@ import org.apache.hadoop.classification.InterfaceAudience; public class IOStreamPair { public final InputStream in; public final OutputStream out; - + public IOStreamPair(InputStream in, OutputStream out) { this.in = in; this.out = out; } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java index 3077498..511574c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java @@ -42,11 +42,11 @@ public enum Op { /** The code for this operation. */ public final byte code; - - private Op(byte code) { + + Op(byte code) { this.code = code; } - + private static final int FIRST_CODE = values()[0].code; /** Return the object represented by the code. */ private static Op valueOf(byte code) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java index c9966a7..dbf5e80 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java @@ -36,14 +36,14 @@ import com.google.protobuf.InvalidProtocolBufferException; * Header data for each packet that goes through the read/write pipelines. * Includes all of the information about the packet, excluding checksums and * actual data. - * + * * This data includes: * - the offset in bytes into the HDFS block of the data in this packet * - the sequence number of this packet in the pipeline * - whether or not this is the last packet in the pipeline * - the length of the data in this packet * - whether or not this packet should be synced by the DNs. - * + * * When serialized, this header is written out as a protocol buffer, preceded * by a 4-byte integer representing the full packet length, and a 2-byte short * representing the header length. @@ -51,8 +51,7 @@ import com.google.protobuf.InvalidProtocolBufferException; @InterfaceAudience.Private @InterfaceStability.Evolving public class PacketHeader { - private static final int MAX_PROTO_SIZE = - PacketHeaderProto.newBuilder() + private static final int MAX_PROTO_SIZE = PacketHeaderProto.newBuilder() .setOffsetInBlock(0) .setSeqno(0) .setLastPacketInBlock(false) @@ -76,21 +75,21 @@ public class PacketHeader { Preconditions.checkArgument(packetLen >= Ints.BYTES, "packet len %s should always be at least 4 bytes", packetLen); - + PacketHeaderProto.Builder builder = PacketHeaderProto.newBuilder() - .setOffsetInBlock(offsetInBlock) - .setSeqno(seqno) - .setLastPacketInBlock(lastPacketInBlock) - .setDataLen(dataLen); - + .setOffsetInBlock(offsetInBlock) + .setSeqno(seqno) + .setLastPacketInBlock(lastPacketInBlock) + .setDataLen(dataLen); + if (syncBlock) { // Only set syncBlock if it is specified. // This is wire-incompatible with Hadoop 2.0.0-alpha due to HDFS-3721 // because it changes the length of the packet header, and BlockReceiver // in that version did not support variable-length headers. - builder.setSyncBlock(syncBlock); + builder.setSyncBlock(true); } - + proto = builder.build(); } @@ -121,16 +120,16 @@ public class PacketHeader { @Override public String toString() { return "PacketHeader with packetLen=" + packetLen + - " header data: " + + " header data: " + proto.toString(); } - + public void setFieldsFromData( int packetLen, byte[] headerData) throws InvalidProtocolBufferException { this.packetLen = packetLen; proto = PacketHeaderProto.parseFrom(headerData); } - + public void readFields(ByteBuffer buf) throws IOException { packetLen = buf.getInt(); short protoLen = buf.getShort(); @@ -138,7 +137,7 @@ public class PacketHeader { buf.get(data); proto = PacketHeaderProto.parseFrom(data); } - + public void readFields(DataInputStream in) throws IOException { this.packetLen = in.readInt(); short protoLen = in.readShort(); @@ -170,7 +169,7 @@ public class PacketHeader { throw new RuntimeException(e); } } - + public void write(DataOutputStream out) throws IOException { assert proto.getSerializedSize() <= MAX_PROTO_SIZE : "Expected " + (MAX_PROTO_SIZE) + " got: " + proto.getSerializedSize(); @@ -178,7 +177,7 @@ public class PacketHeader { out.writeShort(proto.getSerializedSize()); proto.writeTo(out); } - + public byte[] getBytes() { ByteBuffer buf = ByteBuffer.allocate(getSerializedSize()); putInBuffer(buf); @@ -187,8 +186,8 @@ public class PacketHeader { /** * Perform a sanity check on the packet, returning true if it is sane. - * @param lastSeqNo the previous sequence number received - we expect the current - * sequence number to be larger by 1. + * @param lastSeqNo the previous sequence number received - we expect the + * current sequence number to be larger by 1. */ public boolean sanityCheck(long lastSeqNo) { // We should only have a non-positive data length for the last packet @@ -196,8 +195,7 @@ public class PacketHeader { // The last packet should not contain data if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false; // Seqnos should always increase by 1 with each packet received - if (proto.getSeqno() != lastSeqNo + 1) return false; - return true; + return proto.getSeqno() == lastSeqNo + 1; } @Override http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java index e6709d9..93e9217 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketReceiver.java @@ -48,7 +48,7 @@ public class PacketReceiver implements Closeable { private static final int MAX_PACKET_SIZE = 16 * 1024 * 1024; static final Logger LOG = LoggerFactory.getLogger(PacketReceiver.class); - + private static final DirectBufferPool bufferPool = new DirectBufferPool(); private final boolean useDirectBuffers; @@ -58,12 +58,12 @@ public class PacketReceiver implements Closeable { * length prefixes. */ private ByteBuffer curPacketBuf = null; - + /** * A slice of {@link #curPacketBuf} which contains just the checksums. */ private ByteBuffer curChecksumSlice = null; - + /** * A slice of {@link #curPacketBuf} which contains just the data. */ @@ -73,7 +73,7 @@ public class PacketReceiver implements Closeable { * The packet header of the most recently read packet. */ private PacketHeader curHeader; - + public PacketReceiver(boolean useDirectBuffers) { this.useDirectBuffers = useDirectBuffers; reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN); @@ -86,14 +86,14 @@ public class PacketReceiver implements Closeable { public ByteBuffer getDataSlice() { return curDataSlice; } - + public ByteBuffer getChecksumSlice() { return curChecksumSlice; } /** * Reads all of the data for the next packet into the appropriate buffers. - * + * * The data slice and checksum slice members will be set to point to the * user data and corresponding checksums. The header will be parsed and * set. @@ -134,7 +134,7 @@ public class PacketReceiver implements Closeable { doReadFully(ch, in, curPacketBuf); curPacketBuf.flip(); int payloadLen = curPacketBuf.getInt(); - + if (payloadLen < Ints.BYTES) { // The "payload length" includes its own length. Therefore it // should never be less than 4 bytes @@ -146,7 +146,7 @@ public class PacketReceiver implements Closeable { if (headerLen < 0) { throw new IOException("Invalid header length " + headerLen); } - + LOG.trace("readNextPacket: dataPlusChecksumLen={}, headerLen={}", dataPlusChecksumLen, headerLen); @@ -177,18 +177,18 @@ public class PacketReceiver implements Closeable { curHeader = new PacketHeader(); } curHeader.setFieldsFromData(payloadLen, headerBuf); - + // Compute the sub-slices of the packet int checksumLen = dataPlusChecksumLen - curHeader.getDataLen(); if (checksumLen < 0) { - throw new IOException("Invalid packet: data length in packet header " + + throw new IOException("Invalid packet: data length in packet header " + "exceeds data length received. dataPlusChecksumLen=" + - dataPlusChecksumLen + " header: " + curHeader); + dataPlusChecksumLen + " header: " + curHeader); } - + reslicePacket(headerLen, checksumLen, curHeader.getDataLen()); } - + /** * Rewrite the last-read packet on the wire to the given output stream. */ @@ -200,7 +200,7 @@ public class PacketReceiver implements Closeable { curPacketBuf.remaining()); } - + private static void doReadFully(ReadableByteChannel ch, InputStream in, ByteBuffer buf) throws IOException { if (ch != null) { @@ -222,7 +222,7 @@ public class PacketReceiver implements Closeable { // 32-bit 16-bit <protobuf> <variable length> // |--- lenThroughHeader ----| // |----------- lenThroughChecksums ----| - // |------------------- lenThroughData ------| + // |------------------- lenThroughData ------| int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen; int lenThroughChecksums = lenThroughHeader + checksumsLen; int lenThroughData = lenThroughChecksums + dataLen; @@ -242,14 +242,14 @@ public class PacketReceiver implements Closeable { curPacketBuf.position(lenThroughChecksums); curPacketBuf.limit(lenThroughData); curDataSlice = curPacketBuf.slice(); - + // Reset buffer to point to the entirety of the packet (including // length prefixes) curPacketBuf.position(0); curPacketBuf.limit(lenThroughData); } - + private static void readChannelFully(ReadableByteChannel ch, ByteBuffer buf) throws IOException { while (buf.remaining() > 0) { @@ -259,7 +259,7 @@ public class PacketReceiver implements Closeable { } } } - + private void reallocPacketBuf(int atLeastCapacity) { // Realloc the buffer if this packet is longer than the previous // one. @@ -277,12 +277,12 @@ public class PacketReceiver implements Closeable { curPacketBuf.flip(); newBuf.put(curPacketBuf); } - + returnPacketBufToPool(); curPacketBuf = newBuf; } } - + private void returnPacketBufToPool() { if (curPacketBuf != null && curPacketBuf.isDirect()) { bufferPool.returnBuffer(curPacketBuf); @@ -294,7 +294,7 @@ public class PacketReceiver implements Closeable { public void close() { returnPacketBufToPool(); } - + @Override protected void finalize() throws Throwable { try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java index 3836606..be822d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PipelineAck.java @@ -94,7 +94,7 @@ public class PipelineAck { /** default constructor **/ public PipelineAck() { } - + /** * Constructor assuming no next DN in pipeline * @param seqno sequence number @@ -125,7 +125,7 @@ public class PipelineAck { .setDownstreamAckTimeNanos(downstreamAckTimeNanos) .build(); } - + /** * Get the sequence number * @return the sequence number @@ -133,7 +133,7 @@ public class PipelineAck { public long getSeqno() { return proto.getSeqno(); } - + /** * Get the number of replies * @return the number of replies @@ -141,7 +141,7 @@ public class PipelineAck { public short getNumOfReplies() { return (short)proto.getReplyCount(); } - + /** * get the header flag of ith reply */ @@ -179,7 +179,7 @@ public class PipelineAck { } /** - * Returns the OOB status if this ack contains one. + * Returns the OOB status if this ack contains one. * @return null if it is not an OOB ack. */ public Status getOOBStatus() { @@ -216,7 +216,7 @@ public class PipelineAck { public void write(OutputStream out) throws IOException { proto.writeDelimitedTo(out); } - + @Override //Object public String toString() { return TextFormat.shortDebugString(proto); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java index c69986a..c21a6a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java @@ -43,19 +43,19 @@ public class ReplaceDatanodeOnFailure { private final Condition condition; - private Policy(Condition condition) { + Policy(Condition condition) { this.condition = condition; } - + Condition getCondition() { return condition; } } /** Datanode replacement condition */ - private static interface Condition { + private interface Condition { /** Return true unconditionally. */ - static final Condition TRUE = new Condition() { + Condition TRUE = new Condition() { @Override public boolean satisfy(short replication, DatanodeInfo[] existings, int nExistings, boolean isAppend, boolean isHflushed) { @@ -64,7 +64,7 @@ public class ReplaceDatanodeOnFailure { }; /** Return false unconditionally. */ - static final Condition FALSE = new Condition() { + Condition FALSE = new Condition() { @Override public boolean satisfy(short replication, DatanodeInfo[] existings, int nExistings, boolean isAppend, boolean isHflushed) { @@ -80,31 +80,24 @@ public class ReplaceDatanodeOnFailure { * (1) floor(r/2) >= n; or * (2) r > n and the block is hflushed/appended. */ - static final Condition DEFAULT = new Condition() { + Condition DEFAULT = new Condition() { @Override public boolean satisfy(final short replication, final DatanodeInfo[] existings, final int n, final boolean isAppend, final boolean isHflushed) { - if (replication < 3) { - return false; - } else { - if (n <= (replication/2)) { - return true; - } else { - return isAppend || isHflushed; - } - } + return replication >= 3 && + (n <= (replication / 2) || isAppend || isHflushed); } }; /** Is the condition satisfied? */ - public boolean satisfy(short replication, DatanodeInfo[] existings, - int nExistings, boolean isAppend, boolean isHflushed); + boolean satisfy(short replication, DatanodeInfo[] existings, int nExistings, + boolean isAppend, boolean isHflushed); } private final Policy policy; private final boolean bestEffort; - + public ReplaceDatanodeOnFailure(Policy policy, boolean bestEffort) { this.policy = policy; this.bestEffort = bestEffort; @@ -124,7 +117,7 @@ public class ReplaceDatanodeOnFailure { * Best effort means that the client will try to replace the failed datanode * (provided that the policy is satisfied), however, it will continue the * write operation in case that the datanode replacement also fails. - * + * * @return Suppose the datanode replacement fails. * false: An exception should be thrown so that the write will fail. * true : The write should be resumed with the remaining datandoes. @@ -137,16 +130,13 @@ public class ReplaceDatanodeOnFailure { public boolean satisfy( final short replication, final DatanodeInfo[] existings, final boolean isAppend, final boolean isHflushed) { - final int n = existings == null? 0: existings.length; - if (n == 0 || n >= replication) { - //don't need to add datanode for any policy. - return false; - } else { - return policy.getCondition().satisfy( - replication, existings, n, isAppend, isHflushed); - } + final int n = existings == null ? 0 : existings.length; + //don't need to add datanode for any policy. + return !(n == 0 || n >= replication) && + policy.getCondition().satisfy(replication, existings, n, isAppend, + isHflushed); } - + @Override public String toString() { return policy.toString(); @@ -158,7 +148,7 @@ public class ReplaceDatanodeOnFailure { final boolean bestEffort = conf.getBoolean( HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT); - + return new ReplaceDatanodeOnFailure(policy, bestEffort); } @@ -197,4 +187,4 @@ public class ReplaceDatanodeOnFailure { HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, bestEffort); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java index d2bc348..6545681 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java @@ -61,12 +61,11 @@ public class Sender implements DataTransferProtocol { /** Create a sender for DataTransferProtocol with a output stream. */ public Sender(final DataOutputStream out) { - this.out = out; + this.out = out; } /** Initialize a operation. */ - private static void op(final DataOutput out, final Op op - ) throws IOException { + private static void op(final DataOutput out, final Op op) throws IOException { out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); op.write(out); } @@ -80,13 +79,14 @@ public class Sender implements DataTransferProtocol { out.flush(); } - static private CachingStrategyProto getCachingStrategy(CachingStrategy cachingStrategy) { + static private CachingStrategyProto getCachingStrategy( + CachingStrategy cachingStrategy) { CachingStrategyProto.Builder builder = CachingStrategyProto.newBuilder(); if (cachingStrategy.getReadahead() != null) { - builder.setReadahead(cachingStrategy.getReadahead().longValue()); + builder.setReadahead(cachingStrategy.getReadahead()); } if (cachingStrategy.getDropBehind() != null) { - builder.setDropBehind(cachingStrategy.getDropBehind().booleanValue()); + builder.setDropBehind(cachingStrategy.getDropBehind()); } return builder.build(); } @@ -101,24 +101,25 @@ public class Sender implements DataTransferProtocol { final CachingStrategy cachingStrategy) throws IOException { OpReadBlockProto proto = OpReadBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, blockToken)) - .setOffset(blockOffset) - .setLen(length) - .setSendChecksums(sendChecksum) - .setCachingStrategy(getCachingStrategy(cachingStrategy)) - .build(); + .setHeader(DataTransferProtoUtil.buildClientHeader(blk, clientName, + blockToken)) + .setOffset(blockOffset) + .setLen(length) + .setSendChecksums(sendChecksum) + .setCachingStrategy(getCachingStrategy(cachingStrategy)) + .build(); send(out, Op.READ_BLOCK, proto); } - + @Override public void writeBlock(final ExtendedBlock blk, - final StorageType storageType, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String clientName, final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes, + final StorageType[] targetStorageTypes, final DatanodeInfo source, final BlockConstructionStage stage, final int pipelineSize, @@ -132,26 +133,27 @@ public class Sender implements DataTransferProtocol { final boolean[] targetPinnings) throws IOException { ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader( blk, clientName, blockToken); - + ChecksumProto checksumProto = - DataTransferProtoUtil.toProto(requestedChecksum); + DataTransferProtoUtil.toProto(requestedChecksum); OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder() - .setHeader(header) - .setStorageType(PBHelperClient.convertStorageType(storageType)) - .addAllTargets(PBHelperClient.convert(targets, 1)) - .addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes, 1)) - .setStage(toProto(stage)) - .setPipelineSize(pipelineSize) - .setMinBytesRcvd(minBytesRcvd) - .setMaxBytesRcvd(maxBytesRcvd) - .setLatestGenerationStamp(latestGenerationStamp) - .setRequestedChecksum(checksumProto) - .setCachingStrategy(getCachingStrategy(cachingStrategy)) - .setAllowLazyPersist(allowLazyPersist) - .setPinning(pinning) - .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1)); - + .setHeader(header) + .setStorageType(PBHelperClient.convertStorageType(storageType)) + .addAllTargets(PBHelperClient.convert(targets, 1)) + .addAllTargetStorageTypes( + PBHelperClient.convertStorageTypes(targetStorageTypes, 1)) + .setStage(toProto(stage)) + .setPipelineSize(pipelineSize) + .setMinBytesRcvd(minBytesRcvd) + .setMaxBytesRcvd(maxBytesRcvd) + .setLatestGenerationStamp(latestGenerationStamp) + .setRequestedChecksum(checksumProto) + .setCachingStrategy(getCachingStrategy(cachingStrategy)) + .setAllowLazyPersist(allowLazyPersist) + .setPinning(pinning) + .addAllTargetPinnings(PBHelperClient.convert(targetPinnings, 1)); + if (source != null) { proto.setSource(PBHelperClient.convertDatanodeInfo(source)); } @@ -165,13 +167,14 @@ public class Sender implements DataTransferProtocol { final String clientName, final DatanodeInfo[] targets, final StorageType[] targetStorageTypes) throws IOException { - + OpTransferBlockProto proto = OpTransferBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildClientHeader( - blk, clientName, blockToken)) - .addAllTargets(PBHelperClient.convert(targets)) - .addAllTargetStorageTypes(PBHelperClient.convertStorageTypes(targetStorageTypes)) - .build(); + .setHeader(DataTransferProtoUtil.buildClientHeader( + blk, clientName, blockToken)) + .addAllTargets(PBHelperClient.convert(targets)) + .addAllTargetStorageTypes( + PBHelperClient.convertStorageTypes(targetStorageTypes)) + .build(); send(out, Op.TRANSFER_BLOCK, proto); } @@ -180,11 +183,11 @@ public class Sender implements DataTransferProtocol { public void requestShortCircuitFds(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken, SlotId slotId, int maxVersion, boolean supportsReceiptVerification) - throws IOException { + throws IOException { OpRequestShortCircuitAccessProto.Builder builder = OpRequestShortCircuitAccessProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader( - blk, blockToken)).setMaxVersion(maxVersion); + .setHeader(DataTransferProtoUtil.buildBaseHeader( + blk, blockToken)).setMaxVersion(maxVersion); if (slotId != null) { builder.setSlotId(PBHelperClient.convert(slotId)); } @@ -192,12 +195,12 @@ public class Sender implements DataTransferProtocol { OpRequestShortCircuitAccessProto proto = builder.build(); send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); } - + @Override public void releaseShortCircuitFds(SlotId slotId) throws IOException { ReleaseShortCircuitAccessRequestProto.Builder builder = ReleaseShortCircuitAccessRequestProto.newBuilder(). - setSlotId(PBHelperClient.convert(slotId)); + setSlotId(PBHelperClient.convert(slotId)); SpanId spanId = Tracer.getCurrentSpanId(); if (spanId.isValid()) { builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder(). @@ -212,7 +215,7 @@ public class Sender implements DataTransferProtocol { public void requestShortCircuitShm(String clientName) throws IOException { ShortCircuitShmRequestProto.Builder builder = ShortCircuitShmRequestProto.newBuilder(). - setClientName(clientName); + setClientName(clientName); SpanId spanId = Tracer.getCurrentSpanId(); if (spanId.isValid()) { builder.setTraceInfo(DataTransferTraceInfoProto.newBuilder(). @@ -222,20 +225,20 @@ public class Sender implements DataTransferProtocol { ShortCircuitShmRequestProto proto = builder.build(); send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); } - + @Override public void replaceBlock(final ExtendedBlock blk, - final StorageType storageType, + final StorageType storageType, final Token<BlockTokenIdentifier> blockToken, final String delHint, final DatanodeInfo source) throws IOException { OpReplaceBlockProto proto = OpReplaceBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) - .setStorageType(PBHelperClient.convertStorageType(storageType)) - .setDelHint(delHint) - .setSource(PBHelperClient.convertDatanodeInfo(source)) - .build(); - + .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .setStorageType(PBHelperClient.convertStorageType(storageType)) + .setDelHint(delHint) + .setSource(PBHelperClient.convertDatanodeInfo(source)) + .build(); + send(out, Op.REPLACE_BLOCK, proto); } @@ -243,9 +246,9 @@ public class Sender implements DataTransferProtocol { public void copyBlock(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken) throws IOException { OpCopyBlockProto proto = OpCopyBlockProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) - .build(); - + .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .build(); + send(out, Op.COPY_BLOCK, proto); } @@ -253,9 +256,9 @@ public class Sender implements DataTransferProtocol { public void blockChecksum(final ExtendedBlock blk, final Token<BlockTokenIdentifier> blockToken) throws IOException { OpBlockChecksumProto proto = OpBlockChecksumProto.newBuilder() - .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) - .build(); - + .setHeader(DataTransferProtoUtil.buildBaseHeader(blk, blockToken)) + .build(); + send(out, Op.BLOCK_CHECKSUM, proto); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java index 3846f4a..71e5abd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/TrustedChannelResolver.java @@ -29,7 +29,7 @@ import org.apache.hadoop.util.ReflectionUtils; * The default implementation is to return false indicating that * the channel is not trusted. * This class can be overridden to provide custom logic to determine - * whether a channel is trusted or not. + * whether a channel is trusted or not. * The custom class can be specified via configuration. * */ @@ -39,14 +39,13 @@ public class TrustedChannelResolver implements Configurable { /** * Returns an instance of TrustedChannelResolver. * Looks up the configuration to see if there is custom class specified. - * @param conf * @return TrustedChannelResolver */ public static TrustedChannelResolver getInstance(Configuration conf) { Class<? extends TrustedChannelResolver> clazz = - conf.getClass( - HdfsClientConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS, - TrustedChannelResolver.class, TrustedChannelResolver.class); + conf.getClass( + HdfsClientConfigKeys.DFS_TRUSTEDCHANNEL_RESOLVER_CLASS, + TrustedChannelResolver.class, TrustedChannelResolver.class); return ReflectionUtils.newInstance(clazz, conf); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java index 006d304..5e07550 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/DataTransferSaslUtil.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -69,7 +69,7 @@ import com.google.protobuf.ByteString; public final class DataTransferSaslUtil { private static final Logger LOG = LoggerFactory.getLogger( - DataTransferSaslUtil.class); + DataTransferSaslUtil.class); /** * Delimiter for the three-part SASL username string. @@ -97,20 +97,20 @@ public final class DataTransferSaslUtil { throw new IOException("Failed to complete SASL handshake"); } Set<String> requestedQop = ImmutableSet.copyOf(Arrays.asList( - saslProps.get(Sasl.QOP).split(","))); + saslProps.get(Sasl.QOP).split(","))); String negotiatedQop = sasl.getNegotiatedQop(); LOG.debug("Verifying QOP, requested QOP = {}, negotiated QOP = {}", - requestedQop, negotiatedQop); + requestedQop, negotiatedQop); if (!requestedQop.contains(negotiatedQop)) { throw new IOException(String.format("SASL handshake completed, but " + - "channel does not have acceptable quality of protection, " + - "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); + "channel does not have acceptable quality of protection, " + + "requested = %s, negotiated = %s", requestedQop, negotiatedQop)); } } - + /** * Check whether requested SASL Qop contains privacy. - * + * * @param saslProps properties of SASL negotiation * @return boolean true if privacy exists */ @@ -145,7 +145,7 @@ public final class DataTransferSaslUtil { */ public static char[] encryptionKeyToPassword(byte[] encryptionKey) { return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8) - .toCharArray(); + .toCharArray(); } /** @@ -153,7 +153,6 @@ public final class DataTransferSaslUtil { * [host][/ip-address]:port. The host may be missing. The IP address (and * preceding '/') may be missing. The port preceded by ':' is always present. * - * @param peer * @return InetAddress from peer */ public static InetAddress getPeerAddress(Peer peer) { @@ -181,23 +180,26 @@ public final class DataTransferSaslUtil { String qops = conf.get(DFS_DATA_TRANSFER_PROTECTION_KEY); if (qops == null || qops.isEmpty()) { LOG.debug("DataTransferProtocol not using SaslPropertiesResolver, no " + - "QOP found in configuration for {}", DFS_DATA_TRANSFER_PROTECTION_KEY); + "QOP found in configuration for {}", + DFS_DATA_TRANSFER_PROTECTION_KEY); return null; } Configuration saslPropsResolverConf = new Configuration(conf); saslPropsResolverConf.set(HADOOP_RPC_PROTECTION, qops); Class<? extends SaslPropertiesResolver> resolverClass = conf.getClass( - HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, - SaslPropertiesResolver.class, SaslPropertiesResolver.class); - resolverClass = conf.getClass(DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, - resolverClass, SaslPropertiesResolver.class); + HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, + SaslPropertiesResolver.class, SaslPropertiesResolver.class); + resolverClass = + conf.getClass(DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, + resolverClass, SaslPropertiesResolver.class); saslPropsResolverConf.setClass(HADOOP_SECURITY_SASL_PROPS_RESOLVER_CLASS, - resolverClass, SaslPropertiesResolver.class); + resolverClass, SaslPropertiesResolver.class); SaslPropertiesResolver resolver = SaslPropertiesResolver.getInstance( - saslPropsResolverConf); + saslPropsResolverConf); LOG.debug("DataTransferProtocol using SaslPropertiesResolver, configured " + - "QOP {} = {}, configured class {} = {}", DFS_DATA_TRANSFER_PROTECTION_KEY, qops, - DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass); + "QOP {} = {}, configured class {} = {}", + DFS_DATA_TRANSFER_PROTECTION_KEY, qops, + DFS_DATA_TRANSFER_SASL_PROPS_RESOLVER_CLASS_KEY, resolverClass); return resolver; } @@ -219,10 +221,10 @@ public final class DataTransferSaslUtil { return proto.getPayload().toByteArray(); } } - + /** - * Reads a SASL negotiation message and negotiation cipher options. - * + * Reads a SASL negotiation message and negotiation cipher options. + * * @param in stream to read * @param cipherOptions list to store negotiation cipher options * @return byte[] SASL negotiation message @@ -246,10 +248,10 @@ public final class DataTransferSaslUtil { return proto.getPayload().toByteArray(); } } - + /** * Negotiate a cipher option which server supports. - * + * * @param conf the configuration * @param options the cipher options which client supports * @return CipherOption negotiated cipher option @@ -279,6 +281,7 @@ public final class DataTransferSaslUtil { byte[] inIv = new byte[suite.getAlgorithmBlockSize()]; byte[] outKey = new byte[keyLen]; byte[] outIv = new byte[suite.getAlgorithmBlockSize()]; + assert codec != null; codec.generateSecureRandom(inKey); codec.generateSecureRandom(inIv); codec.generateSecureRandom(outKey); @@ -289,21 +292,21 @@ public final class DataTransferSaslUtil { } return null; } - + /** * Send SASL message and negotiated cipher option to client. - * + * * @param out stream to receive message * @param payload to send * @param option negotiated cipher option * @throws IOException for any error */ public static void sendSaslMessageAndNegotiatedCipherOption( - OutputStream out, byte[] payload, CipherOption option) - throws IOException { + OutputStream out, byte[] payload, CipherOption option) + throws IOException { DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto.newBuilder(); - + builder.setStatus(DataTransferEncryptorStatus.SUCCESS); if (payload != null) { builder.setPayload(ByteString.copyFrom(payload)); @@ -311,16 +314,16 @@ public final class DataTransferSaslUtil { if (option != null) { builder.addCipherOption(PBHelperClient.convert(option)); } - + DataTransferEncryptorMessageProto proto = builder.build(); proto.writeDelimitedTo(out); out.flush(); } - + /** * Create IOStreamPair of {@link org.apache.hadoop.crypto.CryptoInputStream} * and {@link org.apache.hadoop.crypto.CryptoOutputStream} - * + * * @param conf the configuration * @param cipherOption negotiated cipher option * @param out underlying output stream @@ -330,7 +333,7 @@ public final class DataTransferSaslUtil { * @throws IOException for any error */ public static IOStreamPair createStreamPair(Configuration conf, - CipherOption cipherOption, OutputStream out, InputStream in, + CipherOption cipherOption, OutputStream out, InputStream in, boolean isServer) throws IOException { LOG.debug("Creating IOStreamPair of CryptoInputStream and " + "CryptoOutputStream."); @@ -340,9 +343,9 @@ public final class DataTransferSaslUtil { byte[] inIv = cipherOption.getInIv(); byte[] outKey = cipherOption.getOutKey(); byte[] outIv = cipherOption.getOutIv(); - InputStream cIn = new CryptoInputStream(in, codec, + InputStream cIn = new CryptoInputStream(in, codec, isServer ? inKey : outKey, isServer ? inIv : outIv); - OutputStream cOut = new CryptoOutputStream(out, codec, + OutputStream cOut = new CryptoOutputStream(out, codec, isServer ? outKey : inKey, isServer ? outIv : inIv); return new IOStreamPair(cIn, cOut); } @@ -370,10 +373,10 @@ public final class DataTransferSaslUtil { throws IOException { sendSaslMessage(out, DataTransferEncryptorStatus.SUCCESS, payload, null); } - + /** * Send a SASL negotiation message and negotiation cipher options to server. - * + * * @param out stream to receive message * @param payload to send * @param options cipher options to negotiate @@ -381,10 +384,10 @@ public final class DataTransferSaslUtil { */ public static void sendSaslMessageAndNegotiationCipherOptions( OutputStream out, byte[] payload, List<CipherOption> options) - throws IOException { + throws IOException { DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto.newBuilder(); - + builder.setStatus(DataTransferEncryptorStatus.SUCCESS); if (payload != null) { builder.setPayload(ByteString.copyFrom(payload)); @@ -392,23 +395,23 @@ public final class DataTransferSaslUtil { if (options != null) { builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options)); } - + DataTransferEncryptorMessageProto proto = builder.build(); proto.writeDelimitedTo(out); out.flush(); } - + /** * Read SASL message and negotiated cipher option from server. - * + * * @param in stream to read - * @return SaslResponseWithNegotiatedCipherOption SASL message and + * @return SaslResponseWithNegotiatedCipherOption SASL message and * negotiated cipher option * @throws IOException for any error */ public static SaslResponseWithNegotiatedCipherOption readSaslMessageAndNegotiatedCipherOption(InputStream in) - throws IOException { + throws IOException { DataTransferEncryptorMessageProto proto = DataTransferEncryptorMessageProto.parseFrom(vintPrefixed(in)); if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) { @@ -426,17 +429,17 @@ public final class DataTransferSaslUtil { return new SaslResponseWithNegotiatedCipherOption(response, option); } } - + /** * Encrypt the key and iv of the negotiated cipher option. - * + * * @param option negotiated cipher option * @param sasl SASL participant representing server - * @return CipherOption negotiated cipher option which contains the + * @return CipherOption negotiated cipher option which contains the * encrypted key and iv * @throws IOException for any error */ - public static CipherOption wrap(CipherOption option, SaslParticipant sasl) + public static CipherOption wrap(CipherOption option, SaslParticipant sasl) throws IOException { if (option != null) { byte[] inKey = option.getInKey(); @@ -450,16 +453,16 @@ public final class DataTransferSaslUtil { return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, option.getOutIv()); } - + return null; } - + /** * Decrypt the key and iv of the negotiated cipher option. - * + * * @param option negotiated cipher option * @param sasl SASL participant representing client - * @return CipherOption negotiated cipher option which contains the + * @return CipherOption negotiated cipher option which contains the * decrypted key and iv * @throws IOException for any error */ @@ -477,7 +480,7 @@ public final class DataTransferSaslUtil { return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey, option.getOutIv()); } - + return null; } @@ -492,10 +495,10 @@ public final class DataTransferSaslUtil { */ public static void sendSaslMessage(OutputStream out, DataTransferEncryptorStatus status, byte[] payload, String message) - throws IOException { + throws IOException { DataTransferEncryptorMessageProto.Builder builder = DataTransferEncryptorMessageProto.newBuilder(); - + builder.setStatus(status); if (payload != null) { builder.setPayload(ByteString.copyFrom(payload)); @@ -503,7 +506,7 @@ public final class DataTransferSaslUtil { if (message != null) { builder.setMessage(message); } - + DataTransferEncryptorMessageProto proto = builder.build(); proto.writeDelimitedTo(out); out.flush(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java index 913203c..447d6e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslDataTransferClient.java @@ -76,7 +76,7 @@ import com.google.common.collect.Lists; public class SaslDataTransferClient { private static final Logger LOG = LoggerFactory.getLogger( - SaslDataTransferClient.class); + SaslDataTransferClient.class); private final Configuration conf; private final AtomicBoolean fallbackToSimpleAuth; @@ -94,7 +94,7 @@ public class SaslDataTransferClient { * @param trustedChannelResolver for identifying trusted connections that do * not require SASL negotiation */ - public SaslDataTransferClient(Configuration conf, + public SaslDataTransferClient(Configuration conf, SaslPropertiesResolver saslPropsResolver, TrustedChannelResolver trustedChannelResolver) { this(conf, saslPropsResolver, trustedChannelResolver, null); @@ -110,7 +110,7 @@ public class SaslDataTransferClient { * @param fallbackToSimpleAuth checked on each attempt at general SASL * handshake, if true forces use of simple auth */ - public SaslDataTransferClient(Configuration conf, + public SaslDataTransferClient(Configuration conf, SaslPropertiesResolver saslPropsResolver, TrustedChannelResolver trustedChannelResolver, AtomicBoolean fallbackToSimpleAuth) { @@ -138,9 +138,9 @@ public class SaslDataTransferClient { throws IOException { // The encryption key factory only returns a key if encryption is enabled. DataEncryptionKey encryptionKey = !trustedChannelResolver.isTrusted() ? - encryptionKeyFactory.newDataEncryptionKey() : null; + encryptionKeyFactory.newDataEncryptionKey() : null; IOStreamPair ios = send(socket.getInetAddress(), underlyingOut, - underlyingIn, encryptionKey, accessToken, datanodeId); + underlyingIn, encryptionKey, accessToken, datanodeId); return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut); } @@ -158,8 +158,8 @@ public class SaslDataTransferClient { Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) throws IOException { IOStreamPair ios = checkTrustAndSend(getPeerAddress(peer), - peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory, - accessToken, datanodeId); + peer.getOutputStream(), peer.getInputStream(), encryptionKeyFactory, + accessToken, datanodeId); // TODO: Consider renaming EncryptedPeer to SaslPeer. return ios != null ? new EncryptedPeer(peer, ios) : peer; } @@ -181,7 +181,7 @@ public class SaslDataTransferClient { Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) throws IOException { IOStreamPair ios = checkTrustAndSend(socket.getInetAddress(), underlyingOut, - underlyingIn, encryptionKeyFactory, accessToken, datanodeId); + underlyingIn, encryptionKeyFactory, accessToken, datanodeId); return ios != null ? ios : new IOStreamPair(underlyingIn, underlyingOut); } @@ -207,13 +207,13 @@ public class SaslDataTransferClient { !trustedChannelResolver.isTrusted(addr)) { // The encryption key factory only returns a key if encryption is enabled. DataEncryptionKey encryptionKey = - encryptionKeyFactory.newDataEncryptionKey(); + encryptionKeyFactory.newDataEncryptionKey(); return send(addr, underlyingOut, underlyingIn, encryptionKey, accessToken, - datanodeId); + datanodeId); } else { LOG.debug( - "SASL client skipping handshake on trusted connection for addr = {}, " - + "datanodeId = {}", addr, datanodeId); + "SASL client skipping handshake on trusted connection for addr = {}, " + + "datanodeId = {}", addr, datanodeId); return null; } } @@ -236,40 +236,38 @@ public class SaslDataTransferClient { Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) throws IOException { if (encryptionKey != null) { - LOG.debug( - "SASL client doing encrypted handshake for addr = {}, datanodeId = {}", - addr, datanodeId); + LOG.debug("SASL client doing encrypted handshake for addr = {}, " + + "datanodeId = {}", addr, datanodeId); return getEncryptedStreams(underlyingOut, underlyingIn, - encryptionKey); + encryptionKey); } else if (!UserGroupInformation.isSecurityEnabled()) { - LOG.debug( - "SASL client skipping handshake in unsecured configuration for " - + "addr = {}, datanodeId = {}", addr, datanodeId); + LOG.debug("SASL client skipping handshake in unsecured configuration for " + + "addr = {}, datanodeId = {}", addr, datanodeId); return null; } else if (SecurityUtil.isPrivilegedPort(datanodeId.getXferPort())) { LOG.debug( - "SASL client skipping handshake in secured configuration with " - + "privileged port for addr = {}, datanodeId = {}", addr, datanodeId); + "SASL client skipping handshake in secured configuration with " + + "privileged port for addr = {}, datanodeId = {}", + addr, datanodeId); return null; } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) { LOG.debug( - "SASL client skipping handshake in secured configuration with " - + "unsecured cluster for addr = {}, datanodeId = {}", addr, datanodeId); + "SASL client skipping handshake in secured configuration with " + + "unsecured cluster for addr = {}, datanodeId = {}", + addr, datanodeId); return null; } else if (saslPropsResolver != null) { LOG.debug( - "SASL client doing general handshake for addr = {}, datanodeId = {}", - addr, datanodeId); - return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken, - datanodeId); + "SASL client doing general handshake for addr = {}, datanodeId = {}", + addr, datanodeId); + return getSaslStreams(addr, underlyingOut, underlyingIn, accessToken); } else { // It's a secured cluster using non-privileged ports, but no SASL. The // only way this can happen is if the DataNode has - // ignore.secure.ports.for.testing configured, so this is a rare edge case. - LOG.debug( - "SASL client skipping handshake in secured configuration with no SASL " - + "protection configured for addr = {}, datanodeId = {}", - addr, datanodeId); + // ignore.secure.ports.for.testing configured so this is a rare edge case. + LOG.debug("SASL client skipping handshake in secured configuration with " + + "no SASL protection configured for addr = {}, datanodeId = {}", + addr, datanodeId); return null; } } @@ -287,24 +285,24 @@ public class SaslDataTransferClient { InputStream underlyingIn, DataEncryptionKey encryptionKey) throws IOException { Map<String, String> saslProps = createSaslPropertiesForEncryption( - encryptionKey.encryptionAlgorithm); + encryptionKey.encryptionAlgorithm); LOG.debug("Client using encryption algorithm {}", - encryptionKey.encryptionAlgorithm); + encryptionKey.encryptionAlgorithm); String userName = getUserNameFromEncryptionKey(encryptionKey); char[] password = encryptionKeyToPassword(encryptionKey.encryptionKey); CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, - password); + password); return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, - callbackHandler); + callbackHandler); } /** * The SASL username for an encrypted handshake consists of the keyId, * blockPoolId, and nonce with the first two encoded as Strings, and the third * encoded using Base64. The fields are each separated by a single space. - * + * * @param encryptionKey the encryption key to encode as a SASL username. * @return encoded username containing keyId, blockPoolId, and nonce */ @@ -312,7 +310,8 @@ public class SaslDataTransferClient { DataEncryptionKey encryptionKey) { return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER + - new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8); + new String(Base64.encodeBase64(encryptionKey.nonce, false), + Charsets.UTF_8); } /** @@ -328,7 +327,7 @@ public class SaslDataTransferClient { * Creates a new SaslClientCallbackHandler. * * @param userName SASL user name - * @Param password SASL password + * @param password SASL password */ public SaslClientCallbackHandler(String userName, char[] password) { this.password = password; @@ -342,15 +341,13 @@ public class SaslDataTransferClient { PasswordCallback pc = null; RealmCallback rc = null; for (Callback callback : callbacks) { - if (callback instanceof RealmChoiceCallback) { - continue; - } else if (callback instanceof NameCallback) { + if (callback instanceof NameCallback) { nc = (NameCallback) callback; } else if (callback instanceof PasswordCallback) { pc = (PasswordCallback) callback; } else if (callback instanceof RealmCallback) { rc = (RealmCallback) callback; - } else { + } else if (!(callback instanceof RealmChoiceCallback)) { throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback"); } @@ -374,22 +371,21 @@ public class SaslDataTransferClient { * @param underlyingOut connection output stream * @param underlyingIn connection input stream * @param accessToken connection block access token - * @param datanodeId ID of destination DataNode * @return new pair of streams, wrapped after SASL negotiation * @throws IOException for any error */ private IOStreamPair getSaslStreams(InetAddress addr, OutputStream underlyingOut, InputStream underlyingIn, - Token<BlockTokenIdentifier> accessToken, DatanodeID datanodeId) + Token<BlockTokenIdentifier> accessToken) throws IOException { Map<String, String> saslProps = saslPropsResolver.getClientProperties(addr); String userName = buildUserName(accessToken); char[] password = buildClientPassword(accessToken); CallbackHandler callbackHandler = new SaslClientCallbackHandler(userName, - password); + password); return doSaslHandshake(underlyingOut, underlyingIn, userName, saslProps, - callbackHandler); + callbackHandler); } /** @@ -404,7 +400,7 @@ public class SaslDataTransferClient { */ private static String buildUserName(Token<BlockTokenIdentifier> blockToken) { return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), - Charsets.UTF_8); + Charsets.UTF_8); } /** @@ -413,10 +409,10 @@ public class SaslDataTransferClient { * * @param blockToken for block access * @return SASL password - */ + */ private char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) { return new String(Base64.encodeBase64(blockToken.getPassword(), false), - Charsets.UTF_8).toCharArray(); + Charsets.UTF_8).toCharArray(); } /** @@ -438,7 +434,7 @@ public class SaslDataTransferClient { DataInputStream in = new DataInputStream(underlyingIn); SaslParticipant sasl= SaslParticipant.createClientSaslParticipant(userName, - saslProps, callbackHandler); + saslProps, callbackHandler); out.writeInt(SASL_TRANSFER_MAGIC_NUMBER); out.flush(); @@ -467,11 +463,11 @@ public class SaslDataTransferClient { cipherOptions.add(option); } } - sendSaslMessageAndNegotiationCipherOptions(out, localResponse, + sendSaslMessageAndNegotiationCipherOptions(out, localResponse, cipherOptions); // step 2 (client-side only) - SaslResponseWithNegotiatedCipherOption response = + SaslResponseWithNegotiatedCipherOption response = readSaslMessageAndNegotiatedCipherOption(in); localResponse = sasl.evaluateChallengeOrResponse(response.payload); assert localResponse == null; @@ -485,11 +481,11 @@ public class SaslDataTransferClient { cipherOption = unwrap(response.cipherOption, sasl); } - // If negotiated cipher option is not null, we will use it to create + // If negotiated cipher option is not null, we will use it to create // stream pair. return cipherOption != null ? createStreamPair( - conf, cipherOption, underlyingOut, underlyingIn, false) : - sasl.createStreamPair(out, in); + conf, cipherOption, underlyingOut, underlyingIn, false) : + sasl.createStreamPair(out, in); } catch (IOException ioe) { sendGenericSaslErrorMessage(out, ioe.getMessage()); throw ioe; http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java index f14a075..1db9f50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslParticipant.java @@ -129,20 +129,20 @@ class SaslParticipant { return (String) saslServer.getNegotiatedProperty(Sasl.QOP); } } - + /** * After successful SASL negotiation, returns whether it's QOP privacy - * + * * @return boolean whether it's QOP privacy */ public boolean isNegotiatedQopPrivacy() { String qop = getNegotiatedQop(); return qop != null && "auth-conf".equalsIgnoreCase(qop); } - + /** * Wraps a byte array. - * + * * @param bytes The array containing the bytes to wrap. * @param off The starting position at the array * @param len The number of bytes to wrap @@ -156,10 +156,10 @@ class SaslParticipant { return saslServer.wrap(bytes, off, len); } } - + /** * Unwraps a byte array. - * + * * @param bytes The array containing the bytes to unwrap. * @param off The starting position at the array * @param len The number of bytes to unwrap http://git-wip-us.apache.org/repos/asf/hadoop/blob/1257483e/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java index f69441b..c1eef4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/sasl/SaslResponseWithNegotiatedCipherOption.java @@ -24,10 +24,10 @@ import org.apache.hadoop.crypto.CipherOption; public class SaslResponseWithNegotiatedCipherOption { final byte[] payload; final CipherOption cipherOption; - - public SaslResponseWithNegotiatedCipherOption(byte[] payload, + + public SaslResponseWithNegotiatedCipherOption(byte[] payload, CipherOption cipherOption) { this.payload = payload; this.cipherOption = cipherOption; } -} \ No newline at end of file +}
