http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java deleted file mode 100644 index 4a016bd..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java +++ /dev/null @@ -1,1903 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; - -import java.io.BufferedOutputStream; -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.io.InterruptedIOException; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.InetSocketAddress; -import java.net.Socket; -import java.nio.channels.ClosedChannelException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.StorageType; -import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite; -import org.apache.hadoop.hdfs.client.impl.DfsClientConf; -import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsConstants; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; -import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException; -import org.apache.hadoop.hdfs.protocol.QuotaByStorageTypeExceededException; -import org.apache.hadoop.hdfs.protocol.QuotaExceededException; -import org.apache.hadoop.hdfs.protocol.UnresolvedPathException; -import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil; -import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; -import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; -import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException; -import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; -import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck; -import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto; -import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; -import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; -import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; -import org.apache.hadoop.hdfs.util.ByteArrayManager; -import org.apache.hadoop.io.IOUtils; -import org.apache.hadoop.io.MultipleIOException; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Daemon; -import org.apache.hadoop.util.DataChecksum; -import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.Time; -import org.apache.htrace.NullScope; -import org.apache.htrace.Sampler; -import org.apache.htrace.Span; -import org.apache.htrace.Trace; -import org.apache.htrace.TraceInfo; -import org.apache.htrace.TraceScope; - -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.cache.RemovalListener; -import com.google.common.cache.RemovalNotification; - -/********************************************************************* - * - * The DataStreamer class is responsible for sending data packets to the - * datanodes in the pipeline. It retrieves a new blockid and block locations - * from the namenode, and starts streaming packets to the pipeline of - * Datanodes. Every packet has a sequence number associated with - * it. When all the packets for a block are sent out and acks for each - * if them are received, the DataStreamer closes the current block. - * - * The DataStreamer thread picks up packets from the dataQueue, sends it to - * the first datanode in the pipeline and moves it from the dataQueue to the - * ackQueue. The ResponseProcessor receives acks from the datanodes. When an - * successful ack for a packet is received from all datanodes, the - * ResponseProcessor removes the corresponding packet from the ackQueue. - * - * In case of error, all outstanding packets are moved from ackQueue. A new - * pipeline is setup by eliminating the bad datanode from the original - * pipeline. The DataStreamer now starts sending packets from the dataQueue. - * - *********************************************************************/ - [email protected] -class DataStreamer extends Daemon { - static final Log LOG = LogFactory.getLog(DataStreamer.class); - - /** - * Create a socket for a write pipeline - * - * @param first the first datanode - * @param length the pipeline length - * @param client client - * @return the socket connected to the first datanode - */ - static Socket createSocketForPipeline(final DatanodeInfo first, - final int length, final DFSClient client) throws IOException { - final DfsClientConf conf = client.getConf(); - final String dnAddr = first.getXferAddr(conf.isConnectToDnViaHostname()); - if (LOG.isDebugEnabled()) { - LOG.debug("Connecting to datanode " + dnAddr); - } - final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr); - final Socket sock = client.socketFactory.createSocket(); - final int timeout = client.getDatanodeReadTimeout(length); - NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), conf.getSocketTimeout()); - sock.setSoTimeout(timeout); - sock.setSendBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE); - if (LOG.isDebugEnabled()) { - LOG.debug("Send buf size " + sock.getSendBufferSize()); - } - return sock; - } - - /** - * if this file is lazy persist - * - * @param stat the HdfsFileStatus of a file - * @return if this file is lazy persist - */ - static boolean isLazyPersist(HdfsFileStatus stat) { - return stat.getStoragePolicy() == HdfsConstants.MEMORY_STORAGE_POLICY_ID; - } - - /** - * release a list of packets to ByteArrayManager - * - * @param packets packets to be release - * @param bam ByteArrayManager - */ - private static void releaseBuffer(List<DFSPacket> packets, ByteArrayManager bam) { - for(DFSPacket p : packets) { - p.releaseBuffer(bam); - } - packets.clear(); - } - - static class LastExceptionInStreamer { - private IOException thrown; - - synchronized void set(Throwable t) { - assert t != null; - this.thrown = t instanceof IOException ? - (IOException) t : new IOException(t); - } - - synchronized void clear() { - thrown = null; - } - - /** Check if there already is an exception. */ - synchronized void check(boolean resetToNull) throws IOException { - if (thrown != null) { - if (LOG.isTraceEnabled()) { - // wrap and print the exception to know when the check is called - LOG.trace("Got Exception while checking", new Throwable(thrown)); - } - final IOException e = thrown; - if (resetToNull) { - thrown = null; - } - throw e; - } - } - - synchronized void throwException4Close() throws IOException { - check(false); - throw new ClosedChannelException(); - } - } - - static class ErrorState { - private boolean error = false; - private int badNodeIndex = -1; - private int restartingNodeIndex = -1; - private long restartingNodeDeadline = 0; - private final long datanodeRestartTimeout; - - ErrorState(long datanodeRestartTimeout) { - this.datanodeRestartTimeout = datanodeRestartTimeout; - } - - synchronized void reset() { - error = false; - badNodeIndex = -1; - restartingNodeIndex = -1; - restartingNodeDeadline = 0; - } - - synchronized boolean hasError() { - return error; - } - - synchronized boolean hasDatanodeError() { - return error && isNodeMarked(); - } - - synchronized void setError(boolean err) { - this.error = err; - } - - synchronized void setBadNodeIndex(int index) { - this.badNodeIndex = index; - } - - synchronized int getBadNodeIndex() { - return badNodeIndex; - } - - synchronized int getRestartingNodeIndex() { - return restartingNodeIndex; - } - - synchronized void initRestartingNode(int i, String message) { - restartingNodeIndex = i; - restartingNodeDeadline = Time.monotonicNow() + datanodeRestartTimeout; - // If the data streamer has already set the primary node - // bad, clear it. It is likely that the write failed due to - // the DN shutdown. Even if it was a real failure, the pipeline - // recovery will take care of it. - badNodeIndex = -1; - LOG.info(message); - } - - synchronized boolean isRestartingNode() { - return restartingNodeIndex >= 0; - } - - synchronized boolean isNodeMarked() { - return badNodeIndex >= 0 || isRestartingNode(); - } - - /** - * This method is used when no explicit error report was received, but - * something failed. The first node is a suspect or unsure about the cause - * so that it is marked as failed. - */ - synchronized void markFirstNodeIfNotMarked() { - // There should be no existing error and no ongoing restart. - if (!isNodeMarked()) { - badNodeIndex = 0; - } - } - - synchronized void adjustState4RestartingNode() { - // Just took care of a node error while waiting for a node restart - if (restartingNodeIndex >= 0) { - // If the error came from a node further away than the restarting - // node, the restart must have been complete. - if (badNodeIndex > restartingNodeIndex) { - restartingNodeIndex = -1; - } else if (badNodeIndex < restartingNodeIndex) { - // the node index has shifted. - restartingNodeIndex--; - } else { - throw new IllegalStateException("badNodeIndex = " + badNodeIndex - + " = restartingNodeIndex = " + restartingNodeIndex); - } - } - - if (!isRestartingNode()) { - error = false; - } - badNodeIndex = -1; - } - - synchronized void checkRestartingNodeDeadline(DatanodeInfo[] nodes) { - if (restartingNodeIndex >= 0) { - if (!error) { - throw new IllegalStateException("error=false while checking" + - " restarting node deadline"); - } - - // check badNodeIndex - if (badNodeIndex == restartingNodeIndex) { - // ignore, if came from the restarting node - badNodeIndex = -1; - } - // not within the deadline - if (Time.monotonicNow() >= restartingNodeDeadline) { - // expired. declare the restarting node dead - restartingNodeDeadline = 0; - final int i = restartingNodeIndex; - restartingNodeIndex = -1; - LOG.warn("Datanode " + i + " did not restart within " - + datanodeRestartTimeout + "ms: " + nodes[i]); - // Mark the restarting node as failed. If there is any other failed - // node during the last pipeline construction attempt, it will not be - // overwritten/dropped. In this case, the restarting node will get - // excluded in the following attempt, if it still does not come up. - if (badNodeIndex == -1) { - badNodeIndex = i; - } - } - } - } - } - - private volatile boolean streamerClosed = false; - private ExtendedBlock block; // its length is number of bytes acked - private Token<BlockTokenIdentifier> accessToken; - private DataOutputStream blockStream; - private DataInputStream blockReplyStream; - private ResponseProcessor response = null; - private volatile DatanodeInfo[] nodes = null; // list of targets for current block - private volatile StorageType[] storageTypes = null; - private volatile String[] storageIDs = null; - private final ErrorState errorState; - - private BlockConstructionStage stage; // block construction stage - private long bytesSent = 0; // number of bytes that've been sent - private final boolean isLazyPersistFile; - - /** Nodes have been used in the pipeline before and have failed. */ - private final List<DatanodeInfo> failed = new ArrayList<>(); - /** The last ack sequence number before pipeline failure. */ - private long lastAckedSeqnoBeforeFailure = -1; - private int pipelineRecoveryCount = 0; - /** Has the current block been hflushed? */ - private boolean isHflushed = false; - /** Append on an existing block? */ - private final boolean isAppend; - - private long currentSeqno = 0; - private long lastQueuedSeqno = -1; - private long lastAckedSeqno = -1; - private long bytesCurBlock = 0; // bytes written in current block - private final LastExceptionInStreamer lastException = new LastExceptionInStreamer(); - private Socket s; - - private final DFSClient dfsClient; - private final String src; - /** Only for DataTransferProtocol.writeBlock(..) */ - private final DataChecksum checksum4WriteBlock; - private final Progressable progress; - private final HdfsFileStatus stat; - // appending to existing partial block - private volatile boolean appendChunk = false; - // both dataQueue and ackQueue are protected by dataQueue lock - private final LinkedList<DFSPacket> dataQueue = new LinkedList<>(); - private final LinkedList<DFSPacket> ackQueue = new LinkedList<>(); - private final AtomicReference<CachingStrategy> cachingStrategy; - private final ByteArrayManager byteArrayManager; - //persist blocks on namenode - private final AtomicBoolean persistBlocks = new AtomicBoolean(false); - private boolean failPacket = false; - private final long dfsclientSlowLogThresholdMs; - private long artificialSlowdown = 0; - // List of congested data nodes. The stream will back off if the DataNodes - // are congested - private final List<DatanodeInfo> congestedNodes = new ArrayList<>(); - private static final int CONGESTION_BACKOFF_MEAN_TIME_IN_MS = 5000; - private static final int CONGESTION_BACK_OFF_MAX_TIME_IN_MS = - CONGESTION_BACKOFF_MEAN_TIME_IN_MS * 10; - private int lastCongestionBackoffTime; - - private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes; - private final String[] favoredNodes; - - private DataStreamer(HdfsFileStatus stat, DFSClient dfsClient, String src, - Progressable progress, DataChecksum checksum, - AtomicReference<CachingStrategy> cachingStrategy, - ByteArrayManager byteArrayManage, - boolean isAppend, String[] favoredNodes) { - this.dfsClient = dfsClient; - this.src = src; - this.progress = progress; - this.stat = stat; - this.checksum4WriteBlock = checksum; - this.cachingStrategy = cachingStrategy; - this.byteArrayManager = byteArrayManage; - this.isLazyPersistFile = isLazyPersist(stat); - this.isAppend = isAppend; - this.favoredNodes = favoredNodes; - - final DfsClientConf conf = dfsClient.getConf(); - this.dfsclientSlowLogThresholdMs = conf.getSlowIoWarningThresholdMs(); - this.excludedNodes = initExcludedNodes(conf.getExcludedNodesCacheExpiry()); - this.errorState = new ErrorState(conf.getDatanodeRestartTimeout()); - } - - /** - * construction with tracing info - */ - DataStreamer(HdfsFileStatus stat, ExtendedBlock block, DFSClient dfsClient, - String src, Progressable progress, DataChecksum checksum, - AtomicReference<CachingStrategy> cachingStrategy, - ByteArrayManager byteArrayManage, String[] favoredNodes) { - this(stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, false, favoredNodes); - this.block = block; - stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; - } - - /** - * Construct a data streamer for appending to the last partial block - * @param lastBlock last block of the file to be appended - * @param stat status of the file to be appended - * @throws IOException if error occurs - */ - DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat, DFSClient dfsClient, - String src, Progressable progress, DataChecksum checksum, - AtomicReference<CachingStrategy> cachingStrategy, - ByteArrayManager byteArrayManage) throws IOException { - this(stat, dfsClient, src, progress, checksum, cachingStrategy, - byteArrayManage, true, null); - stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; - block = lastBlock.getBlock(); - bytesSent = block.getNumBytes(); - accessToken = lastBlock.getBlockToken(); - } - - /** - * Set pipeline in construction - * - * @param lastBlock the last block of a file - * @throws IOException - */ - void setPipelineInConstruction(LocatedBlock lastBlock) throws IOException{ - // setup pipeline to append to the last block XXX retries?? - setPipeline(lastBlock); - if (nodes.length < 1) { - throw new IOException("Unable to retrieve blocks locations " + - " for last block " + block + - "of file " + src); - } - } - - private void setPipeline(LocatedBlock lb) { - setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs()); - } - - private void setPipeline(DatanodeInfo[] nodes, StorageType[] storageTypes, - String[] storageIDs) { - this.nodes = nodes; - this.storageTypes = storageTypes; - this.storageIDs = storageIDs; - } - - /** - * Initialize for data streaming - */ - private void initDataStreaming() { - this.setName("DataStreamer for file " + src + - " block " + block); - response = new ResponseProcessor(nodes); - response.start(); - stage = BlockConstructionStage.DATA_STREAMING; - } - - private void endBlock() { - if(LOG.isDebugEnabled()) { - LOG.debug("Closing old block " + block); - } - this.setName("DataStreamer for file " + src); - closeResponder(); - closeStream(); - setPipeline(null, null, null); - stage = BlockConstructionStage.PIPELINE_SETUP_CREATE; - } - - private boolean shouldStop() { - return streamerClosed || errorState.hasError() || !dfsClient.clientRunning; - } - - /* - * streamer thread is the only thread that opens streams to datanode, - * and closes them. Any error recovery is also done by this thread. - */ - @Override - public void run() { - long lastPacket = Time.monotonicNow(); - TraceScope scope = NullScope.INSTANCE; - while (!streamerClosed && dfsClient.clientRunning) { - // if the Responder encountered an error, shutdown Responder - if (errorState.hasError() && response != null) { - try { - response.close(); - response.join(); - response = null; - } catch (InterruptedException e) { - LOG.warn("Caught exception", e); - } - } - - DFSPacket one; - try { - // process datanode IO errors if any - boolean doSleep = processDatanodeError(); - - final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2; - synchronized (dataQueue) { - // wait for a packet to be sent. - long now = Time.monotonicNow(); - while ((!shouldStop() && dataQueue.size() == 0 && - (stage != BlockConstructionStage.DATA_STREAMING || - stage == BlockConstructionStage.DATA_STREAMING && - now - lastPacket < halfSocketTimeout)) || doSleep ) { - long timeout = halfSocketTimeout - (now-lastPacket); - timeout = timeout <= 0 ? 1000 : timeout; - timeout = (stage == BlockConstructionStage.DATA_STREAMING)? - timeout : 1000; - try { - dataQueue.wait(timeout); - } catch (InterruptedException e) { - LOG.warn("Caught exception", e); - } - doSleep = false; - now = Time.monotonicNow(); - } - if (shouldStop()) { - continue; - } - // get packet to be sent. - if (dataQueue.isEmpty()) { - one = createHeartbeatPacket(); - } else { - try { - backOffIfNecessary(); - } catch (InterruptedException e) { - LOG.warn("Caught exception", e); - } - one = dataQueue.getFirst(); // regular data packet - long parents[] = one.getTraceParents(); - if (parents.length > 0) { - scope = Trace.startSpan("dataStreamer", new TraceInfo(0, parents[0])); - // TODO: use setParents API once it's available from HTrace 3.2 - // scope = Trace.startSpan("dataStreamer", Sampler.ALWAYS); - // scope.getSpan().setParents(parents); - } - } - } - - // get new block from namenode. - if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { - if(LOG.isDebugEnabled()) { - LOG.debug("Allocating new block"); - } - setPipeline(nextBlockOutputStream()); - initDataStreaming(); - } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) { - if(LOG.isDebugEnabled()) { - LOG.debug("Append to block " + block); - } - setupPipelineForAppendOrRecovery(); - if (streamerClosed) { - continue; - } - initDataStreaming(); - } - - long lastByteOffsetInBlock = one.getLastByteOffsetBlock(); - if (lastByteOffsetInBlock > stat.getBlockSize()) { - throw new IOException("BlockSize " + stat.getBlockSize() + - " is smaller than data size. " + - " Offset of packet in block " + - lastByteOffsetInBlock + - " Aborting file " + src); - } - - if (one.isLastPacketInBlock()) { - // wait for all data packets have been successfully acked - synchronized (dataQueue) { - while (!shouldStop() && ackQueue.size() != 0) { - try { - // wait for acks to arrive from datanodes - dataQueue.wait(1000); - } catch (InterruptedException e) { - LOG.warn("Caught exception", e); - } - } - } - if (shouldStop()) { - continue; - } - stage = BlockConstructionStage.PIPELINE_CLOSE; - } - - // send the packet - Span span = null; - synchronized (dataQueue) { - // move packet from dataQueue to ackQueue - if (!one.isHeartbeatPacket()) { - span = scope.detach(); - one.setTraceSpan(span); - dataQueue.removeFirst(); - ackQueue.addLast(one); - dataQueue.notifyAll(); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("DataStreamer block " + block + - " sending packet " + one); - } - - // write out data to remote datanode - TraceScope writeScope = Trace.startSpan("writeTo", span); - try { - one.writeTo(blockStream); - blockStream.flush(); - } catch (IOException e) { - // HDFS-3398 treat primary DN is down since client is unable to - // write to primary DN. If a failed or restarting node has already - // been recorded by the responder, the following call will have no - // effect. Pipeline recovery can handle only one node error at a - // time. If the primary node fails again during the recovery, it - // will be taken out then. - errorState.markFirstNodeIfNotMarked(); - throw e; - } finally { - writeScope.close(); - } - lastPacket = Time.monotonicNow(); - - // update bytesSent - long tmpBytesSent = one.getLastByteOffsetBlock(); - if (bytesSent < tmpBytesSent) { - bytesSent = tmpBytesSent; - } - - if (shouldStop()) { - continue; - } - - // Is this block full? - if (one.isLastPacketInBlock()) { - // wait for the close packet has been acked - synchronized (dataQueue) { - while (!shouldStop() && ackQueue.size() != 0) { - dataQueue.wait(1000);// wait for acks to arrive from datanodes - } - } - if (shouldStop()) { - continue; - } - - endBlock(); - } - if (progress != null) { progress.progress(); } - - // This is used by unit test to trigger race conditions. - if (artificialSlowdown != 0 && dfsClient.clientRunning) { - Thread.sleep(artificialSlowdown); - } - } catch (Throwable e) { - // Log warning if there was a real error. - if (!errorState.isRestartingNode()) { - // Since their messages are descriptive enough, do not always - // log a verbose stack-trace WARN for quota exceptions. - if (e instanceof QuotaExceededException) { - LOG.debug("DataStreamer Quota Exception", e); - } else { - LOG.warn("DataStreamer Exception", e); - } - } - lastException.set(e); - assert !(e instanceof NullPointerException); - errorState.setError(true); - if (!errorState.isNodeMarked()) { - // Not a datanode issue - streamerClosed = true; - } - } finally { - scope.close(); - } - } - closeInternal(); - } - - private void closeInternal() { - closeResponder(); // close and join - closeStream(); - streamerClosed = true; - release(); - synchronized (dataQueue) { - dataQueue.notifyAll(); - } - } - - /** - * release the DFSPackets in the two queues - * - */ - void release() { - synchronized (dataQueue) { - releaseBuffer(dataQueue, byteArrayManager); - releaseBuffer(ackQueue, byteArrayManager); - } - } - - /** - * wait for the ack of seqno - * - * @param seqno the sequence number to be acked - * @throws IOException - */ - void waitForAckedSeqno(long seqno) throws IOException { - TraceScope scope = Trace.startSpan("waitForAckedSeqno", Sampler.NEVER); - try { - if (LOG.isDebugEnabled()) { - LOG.debug("Waiting for ack for: " + seqno); - } - long begin = Time.monotonicNow(); - try { - synchronized (dataQueue) { - while (!streamerClosed) { - checkClosed(); - if (lastAckedSeqno >= seqno) { - break; - } - try { - dataQueue.wait(1000); // when we receive an ack, we notify on - // dataQueue - } catch (InterruptedException ie) { - throw new InterruptedIOException( - "Interrupted while waiting for data to be acknowledged by pipeline"); - } - } - } - checkClosed(); - } catch (ClosedChannelException e) { - } - long duration = Time.monotonicNow() - begin; - if (duration > dfsclientSlowLogThresholdMs) { - LOG.warn("Slow waitForAckedSeqno took " + duration - + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms)"); - } - } finally { - scope.close(); - } - } - - /** - * wait for space of dataQueue and queue the packet - * - * @param packet the DFSPacket to be queued - * @throws IOException - */ - void waitAndQueuePacket(DFSPacket packet) throws IOException { - synchronized (dataQueue) { - try { - // If queue is full, then wait till we have enough space - boolean firstWait = true; - try { - while (!streamerClosed && dataQueue.size() + ackQueue.size() > - dfsClient.getConf().getWriteMaxPackets()) { - if (firstWait) { - Span span = Trace.currentSpan(); - if (span != null) { - span.addTimelineAnnotation("dataQueue.wait"); - } - firstWait = false; - } - try { - dataQueue.wait(); - } catch (InterruptedException e) { - // If we get interrupted while waiting to queue data, we still need to get rid - // of the current packet. This is because we have an invariant that if - // currentPacket gets full, it will get queued before the next writeChunk. - // - // Rather than wait around for space in the queue, we should instead try to - // return to the caller as soon as possible, even though we slightly overrun - // the MAX_PACKETS length. - Thread.currentThread().interrupt(); - break; - } - } - } finally { - Span span = Trace.currentSpan(); - if ((span != null) && (!firstWait)) { - span.addTimelineAnnotation("end.wait"); - } - } - checkClosed(); - queuePacket(packet); - } catch (ClosedChannelException e) { - } - } - } - - /* - * close the streamer, should be called only by an external thread - * and only after all data to be sent has been flushed to datanode. - * - * Interrupt this data streamer if force is true - * - * @param force if this data stream is forced to be closed - */ - void close(boolean force) { - streamerClosed = true; - synchronized (dataQueue) { - dataQueue.notifyAll(); - } - if (force) { - this.interrupt(); - } - } - - - private void checkClosed() throws IOException { - if (streamerClosed) { - lastException.throwException4Close(); - } - } - - private void closeResponder() { - if (response != null) { - try { - response.close(); - response.join(); - } catch (InterruptedException e) { - LOG.warn("Caught exception", e); - } finally { - response = null; - } - } - } - - private void closeStream() { - final MultipleIOException.Builder b = new MultipleIOException.Builder(); - - if (blockStream != null) { - try { - blockStream.close(); - } catch (IOException e) { - b.add(e); - } finally { - blockStream = null; - } - } - if (blockReplyStream != null) { - try { - blockReplyStream.close(); - } catch (IOException e) { - b.add(e); - } finally { - blockReplyStream = null; - } - } - if (null != s) { - try { - s.close(); - } catch (IOException e) { - b.add(e); - } finally { - s = null; - } - } - - final IOException ioe = b.build(); - if (ioe != null) { - lastException.set(ioe); - } - } - - /** - * Examine whether it is worth waiting for a node to restart. - * @param index the node index - */ - boolean shouldWaitForRestart(int index) { - // Only one node in the pipeline. - if (nodes.length == 1) { - return true; - } - - // Is it a local node? - InetAddress addr = null; - try { - addr = InetAddress.getByName(nodes[index].getIpAddr()); - } catch (java.net.UnknownHostException e) { - // we are passing an ip address. this should not happen. - assert false; - } - - if (addr != null && NetUtils.isLocalAddress(addr)) { - return true; - } - return false; - } - - // - // Processes responses from the datanodes. A packet is removed - // from the ackQueue when its response arrives. - // - private class ResponseProcessor extends Daemon { - - private volatile boolean responderClosed = false; - private DatanodeInfo[] targets = null; - private boolean isLastPacketInBlock = false; - - ResponseProcessor (DatanodeInfo[] targets) { - this.targets = targets; - } - - @Override - public void run() { - - setName("ResponseProcessor for block " + block); - PipelineAck ack = new PipelineAck(); - - TraceScope scope = NullScope.INSTANCE; - while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { - // process responses from datanodes. - try { - // read an ack from the pipeline - long begin = Time.monotonicNow(); - ack.readFields(blockReplyStream); - long duration = Time.monotonicNow() - begin; - if (duration > dfsclientSlowLogThresholdMs - && ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) { - LOG.warn("Slow ReadProcessor read fields took " + duration - + "ms (threshold=" + dfsclientSlowLogThresholdMs + "ms); ack: " - + ack + ", targets: " + Arrays.asList(targets)); - } else if (LOG.isDebugEnabled()) { - LOG.debug("DFSClient " + ack); - } - - long seqno = ack.getSeqno(); - // processes response status from datanodes. - ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>(); - for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) { - final Status reply = PipelineAck.getStatusFromHeader(ack - .getHeaderFlag(i)); - if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) == - PipelineAck.ECN.CONGESTED) { - congestedNodesFromAck.add(targets[i]); - } - // Restart will not be treated differently unless it is - // the local node or the only one in the pipeline. - if (PipelineAck.isRestartOOBStatus(reply) && - shouldWaitForRestart(i)) { - final String message = "Datanode " + i + " is restarting: " - + targets[i]; - errorState.initRestartingNode(i, message); - throw new IOException(message); - } - // node error - if (reply != SUCCESS) { - errorState.setBadNodeIndex(i); // mark bad datanode - throw new IOException("Bad response " + reply + - " for " + block + " from datanode " + targets[i]); - } - } - - if (!congestedNodesFromAck.isEmpty()) { - synchronized (congestedNodes) { - congestedNodes.clear(); - congestedNodes.addAll(congestedNodesFromAck); - } - } else { - synchronized (congestedNodes) { - congestedNodes.clear(); - lastCongestionBackoffTime = 0; - } - } - - assert seqno != PipelineAck.UNKOWN_SEQNO : - "Ack for unknown seqno should be a failed ack: " + ack; - if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack - continue; - } - - // a success ack for a data packet - DFSPacket one; - synchronized (dataQueue) { - one = ackQueue.getFirst(); - } - if (one.getSeqno() != seqno) { - throw new IOException("ResponseProcessor: Expecting seqno " + - " for block " + block + - one.getSeqno() + " but received " + seqno); - } - isLastPacketInBlock = one.isLastPacketInBlock(); - - // Fail the packet write for testing in order to force a - // pipeline recovery. - if (DFSClientFaultInjector.get().failPacket() && - isLastPacketInBlock) { - failPacket = true; - throw new IOException( - "Failing the last packet for testing."); - } - - // update bytesAcked - block.setNumBytes(one.getLastByteOffsetBlock()); - - synchronized (dataQueue) { - scope = Trace.continueSpan(one.getTraceSpan()); - one.setTraceSpan(null); - lastAckedSeqno = seqno; - ackQueue.removeFirst(); - dataQueue.notifyAll(); - - one.releaseBuffer(byteArrayManager); - } - } catch (Exception e) { - if (!responderClosed) { - lastException.set(e); - errorState.setError(true); - errorState.markFirstNodeIfNotMarked(); - synchronized (dataQueue) { - dataQueue.notifyAll(); - } - if (!errorState.isRestartingNode()) { - LOG.warn("Exception for " + block, e); - } - responderClosed = true; - } - } finally { - scope.close(); - } - } - } - - void close() { - responderClosed = true; - this.interrupt(); - } - } - - /** - * If this stream has encountered any errors, shutdown threads - * and mark the stream as closed. - * - * @return true if it should sleep for a while after returning. - */ - private boolean processDatanodeError() throws IOException { - if (!errorState.hasDatanodeError()) { - return false; - } - if (response != null) { - LOG.info("Error Recovery for " + block + - " waiting for responder to exit. "); - return true; - } - closeStream(); - - // move packets from ack queue to front of the data queue - synchronized (dataQueue) { - dataQueue.addAll(0, ackQueue); - ackQueue.clear(); - } - - // Record the new pipeline failure recovery. - if (lastAckedSeqnoBeforeFailure != lastAckedSeqno) { - lastAckedSeqnoBeforeFailure = lastAckedSeqno; - pipelineRecoveryCount = 1; - } else { - // If we had to recover the pipeline five times in a row for the - // same packet, this client likely has corrupt data or corrupting - // during transmission. - if (++pipelineRecoveryCount > 5) { - LOG.warn("Error recovering pipeline for writing " + - block + ". Already retried 5 times for the same packet."); - lastException.set(new IOException("Failing write. Tried pipeline " + - "recovery 5 times without success.")); - streamerClosed = true; - return false; - } - } - boolean doSleep = setupPipelineForAppendOrRecovery(); - - if (!streamerClosed && dfsClient.clientRunning) { - if (stage == BlockConstructionStage.PIPELINE_CLOSE) { - - // If we had an error while closing the pipeline, we go through a fast-path - // where the BlockReceiver does not run. Instead, the DataNode just finalizes - // the block immediately during the 'connect ack' process. So, we want to pull - // the end-of-block packet from the dataQueue, since we don't actually have - // a true pipeline to send it over. - // - // We also need to set lastAckedSeqno to the end-of-block Packet's seqno, so that - // a client waiting on close() will be aware that the flush finished. - synchronized (dataQueue) { - DFSPacket endOfBlockPacket = dataQueue.remove(); // remove the end of block packet - Span span = endOfBlockPacket.getTraceSpan(); - if (span != null) { - // Close any trace span associated with this Packet - TraceScope scope = Trace.continueSpan(span); - scope.close(); - } - assert endOfBlockPacket.isLastPacketInBlock(); - assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1; - lastAckedSeqno = endOfBlockPacket.getSeqno(); - dataQueue.notifyAll(); - } - endBlock(); - } else { - initDataStreaming(); - } - } - - return doSleep; - } - - void setHflush() { - isHflushed = true; - } - - private int findNewDatanode(final DatanodeInfo[] original - ) throws IOException { - if (nodes.length != original.length + 1) { - throw new IOException( - new StringBuilder() - .append("Failed to replace a bad datanode on the existing pipeline ") - .append("due to no more good datanodes being available to try. ") - .append("(Nodes: current=").append(Arrays.asList(nodes)) - .append(", original=").append(Arrays.asList(original)).append("). ") - .append("The current failed datanode replacement policy is ") - .append(dfsClient.dtpReplaceDatanodeOnFailure).append(", and ") - .append("a client may configure this via '") - .append(BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY) - .append("' in its configuration.") - .toString()); - } - for(int i = 0; i < nodes.length; i++) { - int j = 0; - for(; j < original.length && !nodes[i].equals(original[j]); j++); - if (j == original.length) { - return i; - } - } - throw new IOException("Failed: new datanode not found: nodes=" - + Arrays.asList(nodes) + ", original=" + Arrays.asList(original)); - } - - private void addDatanode2ExistingPipeline() throws IOException { - if (DataTransferProtocol.LOG.isDebugEnabled()) { - DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno); - } - /* - * Is data transfer necessary? We have the following cases. - * - * Case 1: Failure in Pipeline Setup - * - Append - * + Transfer the stored replica, which may be a RBW or a finalized. - * - Create - * + If no data, then no transfer is required. - * + If there are data written, transfer RBW. This case may happens - * when there are streaming failure earlier in this pipeline. - * - * Case 2: Failure in Streaming - * - Append/Create: - * + transfer RBW - * - * Case 3: Failure in Close - * - Append/Create: - * + no transfer, let NameNode replicates the block. - */ - if (!isAppend && lastAckedSeqno < 0 - && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) { - //no data have been written - return; - } else if (stage == BlockConstructionStage.PIPELINE_CLOSE - || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) { - //pipeline is closing - return; - } - - //get a new datanode - final DatanodeInfo[] original = nodes; - final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode( - src, stat.getFileId(), block, nodes, storageIDs, - failed.toArray(new DatanodeInfo[failed.size()]), - 1, dfsClient.clientName); - setPipeline(lb); - - //find the new datanode - final int d = findNewDatanode(original); - - //transfer replica - final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1]; - final DatanodeInfo[] targets = {nodes[d]}; - final StorageType[] targetStorageTypes = {storageTypes[d]}; - transfer(src, targets, targetStorageTypes, lb.getBlockToken()); - } - - private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets, - final StorageType[] targetStorageTypes, - final Token<BlockTokenIdentifier> blockToken) throws IOException { - //transfer replica to the new datanode - Socket sock = null; - DataOutputStream out = null; - DataInputStream in = null; - try { - sock = createSocketForPipeline(src, 2, dfsClient); - final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2); - final long readTimeout = dfsClient.getDatanodeReadTimeout(2); - - OutputStream unbufOut = NetUtils.getOutputStream(sock, writeTimeout); - InputStream unbufIn = NetUtils.getInputStream(sock, readTimeout); - IOStreamPair saslStreams = dfsClient.saslClient.socketSend(sock, - unbufOut, unbufIn, dfsClient, blockToken, src); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - DFSUtil.getSmallBufferSize(dfsClient.getConfiguration()))); - in = new DataInputStream(unbufIn); - - //send the TRANSFER_BLOCK request - new Sender(out).transferBlock(block, blockToken, dfsClient.clientName, - targets, targetStorageTypes); - out.flush(); - - //ack - BlockOpResponseProto response = - BlockOpResponseProto.parseFrom(PBHelperClient.vintPrefixed(in)); - if (SUCCESS != response.getStatus()) { - throw new IOException("Failed to add a datanode"); - } - } finally { - IOUtils.closeStream(in); - IOUtils.closeStream(out); - IOUtils.closeSocket(sock); - } - } - - /** - * Open a DataStreamer to a DataNode pipeline so that - * it can be written to. - * This happens when a file is appended or data streaming fails - * It keeps on trying until a pipeline is setup - */ - private boolean setupPipelineForAppendOrRecovery() throws IOException { - // check number of datanodes - if (nodes == null || nodes.length == 0) { - String msg = "Could not get block locations. " + "Source file \"" - + src + "\" - Aborting..."; - LOG.warn(msg); - lastException.set(new IOException(msg)); - streamerClosed = true; - return false; - } - - boolean success = false; - long newGS = 0L; - while (!success && !streamerClosed && dfsClient.clientRunning) { - if (!handleRestartingDatanode()) { - return false; - } - - final boolean isRecovery = errorState.hasError(); - if (!handleBadDatanode()) { - return false; - } - - handleDatanodeReplacement(); - - // get a new generation stamp and an access token - final LocatedBlock lb = updateBlockForPipeline(); - newGS = lb.getBlock().getGenerationStamp(); - accessToken = lb.getBlockToken(); - - // set up the pipeline again with the remaining nodes - success = createBlockOutputStream(nodes, storageTypes, newGS, isRecovery); - - failPacket4Testing(); - - errorState.checkRestartingNodeDeadline(nodes); - } // while - - if (success) { - block = updatePipeline(newGS); - } - return false; // do not sleep, continue processing - } - - /** - * Sleep if a node is restarting. - * This process is repeated until the deadline or the node starts back up. - * @return true if it should continue. - */ - private boolean handleRestartingDatanode() { - if (errorState.isRestartingNode()) { - // 4 seconds or the configured deadline period, whichever is shorter. - // This is the retry interval and recovery will be retried in this - // interval until timeout or success. - final long delay = Math.min(errorState.datanodeRestartTimeout, 4000L); - try { - Thread.sleep(delay); - } catch (InterruptedException ie) { - lastException.set(new IOException( - "Interrupted while waiting for restarting " - + nodes[errorState.getRestartingNodeIndex()])); - streamerClosed = true; - return false; - } - } - return true; - } - - /** - * Remove bad node from list of nodes if badNodeIndex was set. - * @return true if it should continue. - */ - private boolean handleBadDatanode() { - final int badNodeIndex = errorState.getBadNodeIndex(); - if (badNodeIndex >= 0) { - if (nodes.length <= 1) { - lastException.set(new IOException("All datanodes " - + Arrays.toString(nodes) + " are bad. Aborting...")); - streamerClosed = true; - return false; - } - - LOG.warn("Error Recovery for " + block + " in pipeline " - + Arrays.toString(nodes) + ": datanode " + badNodeIndex - + "("+ nodes[badNodeIndex] + ") is bad."); - failed.add(nodes[badNodeIndex]); - - DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1]; - arraycopy(nodes, newnodes, badNodeIndex); - - final StorageType[] newStorageTypes = new StorageType[newnodes.length]; - arraycopy(storageTypes, newStorageTypes, badNodeIndex); - - final String[] newStorageIDs = new String[newnodes.length]; - arraycopy(storageIDs, newStorageIDs, badNodeIndex); - - setPipeline(newnodes, newStorageTypes, newStorageIDs); - - errorState.adjustState4RestartingNode(); - lastException.clear(); - } - return true; - } - - /** Add a datanode if replace-datanode policy is satisfied. */ - private void handleDatanodeReplacement() throws IOException { - if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(stat.getReplication(), - nodes, isAppend, isHflushed)) { - try { - addDatanode2ExistingPipeline(); - } catch(IOException ioe) { - if (!dfsClient.dtpReplaceDatanodeOnFailure.isBestEffort()) { - throw ioe; - } - LOG.warn("Failed to replace datanode." - + " Continue with the remaining datanodes since " - + BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY - + " is set to true.", ioe); - } - } - } - - private void failPacket4Testing() { - if (failPacket) { // for testing - failPacket = false; - try { - // Give DNs time to send in bad reports. In real situations, - // good reports should follow bad ones, if client committed - // with those nodes. - Thread.sleep(2000); - } catch (InterruptedException ie) {} - } - } - - LocatedBlock updateBlockForPipeline() throws IOException { - return dfsClient.namenode.updateBlockForPipeline( - block, dfsClient.clientName); - } - - /** update pipeline at the namenode */ - ExtendedBlock updatePipeline(long newGS) throws IOException { - final ExtendedBlock newBlock = new ExtendedBlock( - block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS); - dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, - nodes, storageIDs); - return newBlock; - } - - /** - * Open a DataStreamer to a DataNode so that it can be written to. - * This happens when a file is created and each time a new block is allocated. - * Must get block ID and the IDs of the destinations from the namenode. - * Returns the list of target datanodes. - */ - private LocatedBlock nextBlockOutputStream() throws IOException { - LocatedBlock lb = null; - DatanodeInfo[] nodes = null; - StorageType[] storageTypes = null; - int count = dfsClient.getConf().getNumBlockWriteRetry(); - boolean success = false; - ExtendedBlock oldBlock = block; - do { - errorState.reset(); - lastException.clear(); - success = false; - - DatanodeInfo[] excluded = - excludedNodes.getAllPresent(excludedNodes.asMap().keySet()) - .keySet() - .toArray(new DatanodeInfo[0]); - block = oldBlock; - lb = locateFollowingBlock(excluded.length > 0 ? excluded : null); - block = lb.getBlock(); - block.setNumBytes(0); - bytesSent = 0; - accessToken = lb.getBlockToken(); - nodes = lb.getLocations(); - storageTypes = lb.getStorageTypes(); - - // - // Connect to first DataNode in the list. - // - success = createBlockOutputStream(nodes, storageTypes, 0L, false); - - if (!success) { - LOG.info("Abandoning " + block); - dfsClient.namenode.abandonBlock(block, stat.getFileId(), src, - dfsClient.clientName); - block = null; - final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; - LOG.info("Excluding datanode " + badNode); - excludedNodes.put(badNode, badNode); - } - } while (!success && --count >= 0); - - if (!success) { - throw new IOException("Unable to create new block."); - } - return lb; - } - - // connects to the first datanode in the pipeline - // Returns true if success, otherwise return failure. - // - private boolean createBlockOutputStream(DatanodeInfo[] nodes, - StorageType[] nodeStorageTypes, long newGS, boolean recoveryFlag) { - if (nodes.length == 0) { - LOG.info("nodes are empty for write pipeline of " + block); - return false; - } - Status pipelineStatus = SUCCESS; - String firstBadLink = ""; - boolean checkRestart = false; - if (LOG.isDebugEnabled()) { - LOG.debug("pipeline = " + Arrays.asList(nodes)); - } - - // persist blocks on namenode on next flush - persistBlocks.set(true); - - int refetchEncryptionKey = 1; - while (true) { - boolean result = false; - DataOutputStream out = null; - try { - assert null == s : "Previous socket unclosed"; - assert null == blockReplyStream : "Previous blockReplyStream unclosed"; - s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); - long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length); - long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length); - - OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout); - InputStream unbufIn = NetUtils.getInputStream(s, readTimeout); - IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s, - unbufOut, unbufIn, dfsClient, accessToken, nodes[0]); - unbufOut = saslStreams.out; - unbufIn = saslStreams.in; - out = new DataOutputStream(new BufferedOutputStream(unbufOut, - DFSUtil.getSmallBufferSize(dfsClient.getConfiguration()))); - blockReplyStream = new DataInputStream(unbufIn); - - // - // Xmit header info to datanode - // - - BlockConstructionStage bcs = recoveryFlag? stage.getRecoveryStage(): stage; - - // We cannot change the block length in 'block' as it counts the number - // of bytes ack'ed. - ExtendedBlock blockCopy = new ExtendedBlock(block); - blockCopy.setNumBytes(stat.getBlockSize()); - - boolean[] targetPinnings = getPinnings(nodes, true); - // send the request - new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken, - dfsClient.clientName, nodes, nodeStorageTypes, null, bcs, - nodes.length, block.getNumBytes(), bytesSent, newGS, - checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile, - (targetPinnings == null ? false : targetPinnings[0]), targetPinnings); - - // receive ack for connect - BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( - PBHelperClient.vintPrefixed(blockReplyStream)); - pipelineStatus = resp.getStatus(); - firstBadLink = resp.getFirstBadLink(); - - // Got an restart OOB ack. - // If a node is already restarting, this status is not likely from - // the same node. If it is from a different node, it is not - // from the local datanode. Thus it is safe to treat this as a - // regular node error. - if (PipelineAck.isRestartOOBStatus(pipelineStatus) && - !errorState.isRestartingNode()) { - checkRestart = true; - throw new IOException("A datanode is restarting."); - } - - String logInfo = "ack with firstBadLink as " + firstBadLink; - DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo); - - assert null == blockStream : "Previous blockStream unclosed"; - blockStream = out; - result = true; // success - errorState.reset(); - } catch (IOException ie) { - if (!errorState.isRestartingNode()) { - LOG.info("Exception in createBlockOutputStream", ie); - } - if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { - LOG.info("Will fetch a new encryption key and retry, " - + "encryption key was invalid when connecting to " - + nodes[0] + " : " + ie); - // The encryption key used is invalid. - refetchEncryptionKey--; - dfsClient.clearDataEncryptionKey(); - // Don't close the socket/exclude this node just yet. Try again with - // a new encryption key. - continue; - } - - // find the datanode that matches - if (firstBadLink.length() != 0) { - for (int i = 0; i < nodes.length; i++) { - // NB: Unconditionally using the xfer addr w/o hostname - if (firstBadLink.equals(nodes[i].getXferAddr())) { - errorState.setBadNodeIndex(i); - break; - } - } - } else { - assert checkRestart == false; - errorState.setBadNodeIndex(0); - } - - final int i = errorState.getBadNodeIndex(); - // Check whether there is a restart worth waiting for. - if (checkRestart && shouldWaitForRestart(i)) { - errorState.initRestartingNode(i, "Datanode " + i + " is restarting: " + nodes[i]); - } - errorState.setError(true); - lastException.set(ie); - result = false; // error - } finally { - if (!result) { - IOUtils.closeSocket(s); - s = null; - IOUtils.closeStream(out); - out = null; - IOUtils.closeStream(blockReplyStream); - blockReplyStream = null; - } - } - return result; - } - } - - private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) { - if (favoredNodes == null) { - return null; - } else { - boolean[] pinnings = new boolean[nodes.length]; - HashSet<String> favoredSet = - new HashSet<String>(Arrays.asList(favoredNodes)); - for (int i = 0; i < nodes.length; i++) { - pinnings[i] = favoredSet.remove(nodes[i].getXferAddrWithHostname()); - if (LOG.isDebugEnabled()) { - LOG.debug(nodes[i].getXferAddrWithHostname() + - " was chosen by name node (favored=" + pinnings[i] + ")."); - } - } - if (shouldLog && !favoredSet.isEmpty()) { - // There is one or more favored nodes that were not allocated. - LOG.warn("These favored nodes were specified but not chosen: " - + favoredSet + " Specified favored nodes: " - + Arrays.toString(favoredNodes)); - - } - return pinnings; - } - } - - protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) - throws IOException { - final DfsClientConf conf = dfsClient.getConf(); - int retries = conf.getNumBlockWriteLocateFollowingRetry(); - long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs(); - while (true) { - long localstart = Time.monotonicNow(); - while (true) { - try { - return dfsClient.namenode.addBlock(src, dfsClient.clientName, - block, excludedNodes, stat.getFileId(), favoredNodes); - } catch (RemoteException e) { - IOException ue = - e.unwrapRemoteException(FileNotFoundException.class, - AccessControlException.class, - NSQuotaExceededException.class, - DSQuotaExceededException.class, - QuotaByStorageTypeExceededException.class, - UnresolvedPathException.class); - if (ue != e) { - throw ue; // no need to retry these exceptions - } - - - if (NotReplicatedYetException.class.getName(). - equals(e.getClassName())) { - if (retries == 0) { - throw e; - } else { - --retries; - LOG.info("Exception while adding a block", e); - long elapsed = Time.monotonicNow() - localstart; - if (elapsed > 5000) { - LOG.info("Waiting for replication for " - + (elapsed / 1000) + " seconds"); - } - try { - LOG.warn("NotReplicatedYetException sleeping " + src - + " retries left " + retries); - Thread.sleep(sleeptime); - sleeptime *= 2; - } catch (InterruptedException ie) { - LOG.warn("Caught exception", ie); - } - } - } else { - throw e; - } - - } - } - } - } - - /** - * This function sleeps for a certain amount of time when the writing - * pipeline is congested. The function calculates the time based on a - * decorrelated filter. - * - * @see - * <a href="http://www.awsarchitectureblog.com/2015/03/backoff.html"> - * http://www.awsarchitectureblog.com/2015/03/backoff.html</a> - */ - private void backOffIfNecessary() throws InterruptedException { - int t = 0; - synchronized (congestedNodes) { - if (!congestedNodes.isEmpty()) { - StringBuilder sb = new StringBuilder("DataNode"); - for (DatanodeInfo i : congestedNodes) { - sb.append(' ').append(i); - } - int range = Math.abs(lastCongestionBackoffTime * 3 - - CONGESTION_BACKOFF_MEAN_TIME_IN_MS); - int base = Math.min(lastCongestionBackoffTime * 3, - CONGESTION_BACKOFF_MEAN_TIME_IN_MS); - t = Math.min(CONGESTION_BACK_OFF_MAX_TIME_IN_MS, - (int)(base + Math.random() * range)); - lastCongestionBackoffTime = t; - sb.append(" are congested. Backing off for ").append(t).append(" ms"); - LOG.info(sb.toString()); - congestedNodes.clear(); - } - } - if (t != 0) { - Thread.sleep(t); - } - } - - /** - * get the block this streamer is writing to - * - * @return the block this streamer is writing to - */ - ExtendedBlock getBlock() { - return block; - } - - /** - * return the target datanodes in the pipeline - * - * @return the target datanodes in the pipeline - */ - DatanodeInfo[] getNodes() { - return nodes; - } - - /** - * return the token of the block - * - * @return the token of the block - */ - Token<BlockTokenIdentifier> getBlockToken() { - return accessToken; - } - - /** - * Put a packet to the data queue - * - * @param packet the packet to be put into the data queued - */ - void queuePacket(DFSPacket packet) { - synchronized (dataQueue) { - if (packet == null) return; - packet.addTraceParent(Trace.currentSpan()); - dataQueue.addLast(packet); - lastQueuedSeqno = packet.getSeqno(); - if (LOG.isDebugEnabled()) { - LOG.debug("Queued packet " + packet.getSeqno()); - } - dataQueue.notifyAll(); - } - } - - /** - * For heartbeat packets, create buffer directly by new byte[] - * since heartbeats should not be blocked. - */ - private DFSPacket createHeartbeatPacket() throws InterruptedIOException { - final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN]; - return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO, 0, false); - } - - private static LoadingCache<DatanodeInfo, DatanodeInfo> initExcludedNodes( - long excludedNodesCacheExpiry) { - return CacheBuilder.newBuilder() - .expireAfterWrite(excludedNodesCacheExpiry, TimeUnit.MILLISECONDS) - .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() { - @Override - public void onRemoval( - RemovalNotification<DatanodeInfo, DatanodeInfo> notification) { - LOG.info("Removing node " + notification.getKey() - + " from the excluded nodes list"); - } - }).build(new CacheLoader<DatanodeInfo, DatanodeInfo>() { - @Override - public DatanodeInfo load(DatanodeInfo key) throws Exception { - return key; - } - }); - } - - private static <T> void arraycopy(T[] srcs, T[] dsts, int skipIndex) { - System.arraycopy(srcs, 0, dsts, 0, skipIndex); - System.arraycopy(srcs, skipIndex+1, dsts, skipIndex, dsts.length-skipIndex); - } - - /** - * check if to persist blocks on namenode - * - * @return if to persist blocks on namenode - */ - AtomicBoolean getPersistBlocks(){ - return persistBlocks; - } - - /** - * check if to append a chunk - * - * @param appendChunk if to append a chunk - */ - void setAppendChunk(boolean appendChunk){ - this.appendChunk = appendChunk; - } - - /** - * get if to append a chunk - * - * @return if to append a chunk - */ - boolean getAppendChunk(){ - return appendChunk; - } - - /** - * @return the last exception - */ - LastExceptionInStreamer getLastException(){ - return lastException; - } - - /** - * set socket to null - */ - void setSocketToNull() { - this.s = null; - } - - /** - * return current sequence number and then increase it by 1 - * - * @return current sequence number before increasing - */ - long getAndIncCurrentSeqno() { - long old = this.currentSeqno; - this.currentSeqno++; - return old; - } - - /** - * get last queued sequence number - * - * @return last queued sequence number - */ - long getLastQueuedSeqno() { - return lastQueuedSeqno; - } - - /** - * get the number of bytes of current block - * - * @return the number of bytes of current block - */ - long getBytesCurBlock() { - return bytesCurBlock; - } - - /** - * set the bytes of current block that have been written - * - * @param bytesCurBlock bytes of current block that have been written - */ - void setBytesCurBlock(long bytesCurBlock) { - this.bytesCurBlock = bytesCurBlock; - } - - /** - * increase bytes of current block by len. - * - * @param len how many bytes to increase to current block - */ - void incBytesCurBlock(long len) { - this.bytesCurBlock += len; - } - - /** - * set artificial slow down for unit test - * - * @param period artificial slow down - */ - void setArtificialSlowdown(long period) { - this.artificialSlowdown = period; - } - - /** - * if this streamer is to terminate - * - * @return if this streamer is to terminate - */ - boolean streamerClosed(){ - return streamerClosed; - } - - void closeSocket() throws IOException { - if (s != null) { - s.close(); - } - } - - @Override - public String toString() { - return (block == null? null: block.getLocalBlock()) - + "@" + Arrays.toString(getNodes()); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java index ef9f27a..b6bf6cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HdfsConfiguration.java @@ -31,9 +31,7 @@ public class HdfsConfiguration extends Configuration { addDeprecatedKeys(); // adds the default resources - Configuration.addDefaultResource("hdfs-default.xml"); - Configuration.addDefaultResource("hdfs-site.xml"); - + HdfsConfigurationLoader.init(); } public HdfsConfiguration() { @@ -52,9 +50,10 @@ public class HdfsConfiguration extends Configuration { * This method is here so that when invoked, HdfsConfiguration is class-loaded if * it hasn't already been previously loaded. Upon loading the class, the static * initializer block above will be executed to add the deprecated keys and to add - * the default resources. It is safe for this method to be called multiple times - * as the static initializer block will only get invoked once. - * + * the default resources via {@link HdfsConfigurationLoader#init()}. It is + * safe for this method to be called multiple times as the static initializer + * block will only get invoked once. + * * This replaces the previously, dangerous practice of other classes calling * Configuration.addDefaultResource("hdfs-default.xml") directly without loading * HdfsConfiguration class first, thereby skipping the key deprecation http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java deleted file mode 100644 index f03e179..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.io.IOException; -import java.net.InetSocketAddress; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.hdfs.protocol.DatanodeID; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.security.token.Token; - [email protected] -public interface RemotePeerFactory { - /** - * @param addr The address to connect to. - * @param blockToken Token used during optional SASL negotiation - * @param datanodeId ID of destination DataNode - * @return A new Peer connected to the address. - * - * @throws IOException If there was an error connecting or creating - * the remote socket, encrypted stream, etc. - */ - Peer newConnectedPeer(InetSocketAddress addr, - Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) - throws IOException; -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java deleted file mode 100644 index ec17bb8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCipherSuiteException.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * Thrown when an unknown cipher suite is encountered. - */ [email protected] [email protected] -public class UnknownCipherSuiteException extends IOException { - public UnknownCipherSuiteException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java deleted file mode 100644 index 0aac8c8..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/UnknownCryptoProtocolVersionException.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - [email protected] [email protected] -public class UnknownCryptoProtocolVersionException extends IOException { - private static final long serialVersionUID = 8957192l; - - public UnknownCryptoProtocolVersionException() { - super(); - } - - public UnknownCryptoProtocolVersionException(String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java deleted file mode 100644 index 2655c40..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/XAttrHelper.java +++ /dev/null @@ -1,174 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs; - -import java.util.List; -import java.util.Map; - -import org.apache.hadoop.HadoopIllegalArgumentException; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.fs.XAttr; -import org.apache.hadoop.fs.XAttr.NameSpace; -import org.apache.hadoop.util.StringUtils; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - [email protected] -public class XAttrHelper { - - /** - * Build <code>XAttr</code> from xattr name with prefix. - */ - public static XAttr buildXAttr(String name) { - return buildXAttr(name, null); - } - - /** - * Build <code>XAttr</code> from name with prefix and value. - * Name can not be null. Value can be null. The name and prefix - * are validated. - * Both name and namespace are case sensitive. - */ - public static XAttr buildXAttr(String name, byte[] value) { - Preconditions.checkNotNull(name, "XAttr name cannot be null."); - - final int prefixIndex = name.indexOf("."); - if (prefixIndex < 3) {// Prefix length is at least 3. - throw new HadoopIllegalArgumentException("An XAttr name must be " + - "prefixed with user/trusted/security/system/raw, followed by a '.'"); - } else if (prefixIndex == name.length() - 1) { - throw new HadoopIllegalArgumentException("XAttr name cannot be empty."); - } - - NameSpace ns; - final String prefix = name.substring(0, prefixIndex); - if (StringUtils.equalsIgnoreCase(prefix, NameSpace.USER.toString())) { - ns = NameSpace.USER; - } else if ( - StringUtils.equalsIgnoreCase(prefix, NameSpace.TRUSTED.toString())) { - ns = NameSpace.TRUSTED; - } else if ( - StringUtils.equalsIgnoreCase(prefix, NameSpace.SYSTEM.toString())) { - ns = NameSpace.SYSTEM; - } else if ( - StringUtils.equalsIgnoreCase(prefix, NameSpace.SECURITY.toString())) { - ns = NameSpace.SECURITY; - } else if ( - StringUtils.equalsIgnoreCase(prefix, NameSpace.RAW.toString())) { - ns = NameSpace.RAW; - } else { - throw new HadoopIllegalArgumentException("An XAttr name must be " + - "prefixed with user/trusted/security/system/raw, followed by a '.'"); - } - XAttr xAttr = (new XAttr.Builder()).setNameSpace(ns).setName(name. - substring(prefixIndex + 1)).setValue(value).build(); - - return xAttr; - } - - /** - * Build xattr name with prefix as <code>XAttr</code> list. - */ - public static List<XAttr> buildXAttrAsList(String name) { - XAttr xAttr = buildXAttr(name); - List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1); - xAttrs.add(xAttr); - - return xAttrs; - } - - /** - * Get value of first xattr from <code>XAttr</code> list - */ - public static byte[] getFirstXAttrValue(List<XAttr> xAttrs) { - byte[] value = null; - XAttr xAttr = getFirstXAttr(xAttrs); - if (xAttr != null) { - value = xAttr.getValue(); - if (value == null) { - value = new byte[0]; // xattr exists, but no value. - } - } - return value; - } - - /** - * Get first xattr from <code>XAttr</code> list - */ - public static XAttr getFirstXAttr(List<XAttr> xAttrs) { - if (xAttrs != null && !xAttrs.isEmpty()) { - return xAttrs.get(0); - } - - return null; - } - - /** - * Build xattr map from <code>XAttr</code> list, the key is - * xattr name with prefix, and value is xattr value. - */ - public static Map<String, byte[]> buildXAttrMap(List<XAttr> xAttrs) { - if (xAttrs == null) { - return null; - } - Map<String, byte[]> xAttrMap = Maps.newHashMap(); - for (XAttr xAttr : xAttrs) { - String name = getPrefixedName(xAttr); - byte[] value = xAttr.getValue(); - if (value == null) { - value = new byte[0]; - } - xAttrMap.put(name, value); - } - - return xAttrMap; - } - - /** - * Get name with prefix from <code>XAttr</code> - */ - public static String getPrefixedName(XAttr xAttr) { - if (xAttr == null) { - return null; - } - - return getPrefixedName(xAttr.getNameSpace(), xAttr.getName()); - } - - public static String getPrefixedName(XAttr.NameSpace ns, String name) { - return StringUtils.toLowerCase(ns.toString()) + "." + name; - } - - /** - * Build <code>XAttr</code> list from xattr name list. - */ - public static List<XAttr> buildXAttrs(List<String> names) { - if (names == null || names.isEmpty()) { - throw new HadoopIllegalArgumentException("XAttr names can not be " + - "null or empty."); - } - - List<XAttr> xAttrs = Lists.newArrayListWithCapacity(names.size()); - for (String name : names) { - xAttrs.add(buildXAttr(name, null)); - } - return xAttrs; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/94cbb6d1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java deleted file mode 100644 index e8ac686..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataInputStream.java +++ /dev/null @@ -1,113 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.client; - -import java.io.InputStream; -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.crypto.CryptoInputStream; -import org.apache.hadoop.hdfs.DFSInputStream; -import org.apache.hadoop.hdfs.protocol.DatanodeInfo; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; - -import com.google.common.base.Preconditions; - -/** - * The Hdfs implementation of {@link FSDataInputStream}. - */ [email protected] [email protected] -public class HdfsDataInputStream extends FSDataInputStream { - public HdfsDataInputStream(DFSInputStream in) throws IOException { - super(in); - } - - public HdfsDataInputStream(CryptoInputStream in) throws IOException { - super(in); - Preconditions.checkArgument(in.getWrappedStream() instanceof DFSInputStream, - "CryptoInputStream should wrap a DFSInputStream"); - } - - private DFSInputStream getDFSInputStream() { - if (in instanceof CryptoInputStream) { - return (DFSInputStream) ((CryptoInputStream) in).getWrappedStream(); - } - return (DFSInputStream) in; - } - - /** - * Get a reference to the wrapped output stream. We always want to return the - * actual underlying InputStream, even when we're using a CryptoStream. e.g. - * in the delegated methods below. - * - * @return the underlying output stream - */ - public InputStream getWrappedStream() { - return in; - } - - /** - * Get the datanode from which the stream is currently reading. - */ - public DatanodeInfo getCurrentDatanode() { - return getDFSInputStream().getCurrentDatanode(); - } - - /** - * Get the block containing the target position. - */ - public ExtendedBlock getCurrentBlock() { - return getDFSInputStream().getCurrentBlock(); - } - - /** - * Get the collection of blocks that has already been located. - */ - public List<LocatedBlock> getAllBlocks() throws IOException { - return getDFSInputStream().getAllBlocks(); - } - - /** - * Get the visible length of the file. It will include the length of the last - * block even if that is in UnderConstruction state. - * - * @return The visible length of the file. - */ - public long getVisibleLength() throws IOException { - return getDFSInputStream().getFileLength(); - } - - /** - * Get statistics about the reads which this DFSInputStream has done. - * Note that because HdfsDataInputStream is buffered, these stats may - * be higher than you would expect just by adding up the number of - * bytes read through HdfsDataInputStream. - */ - public DFSInputStream.ReadStatistics getReadStatistics() { - return getDFSInputStream().getReadStatistics(); - } - - public void clearReadStatistics() { - getDFSInputStream().clearReadStatistics(); - } -}
