http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java new file mode 100644 index 0000000..3a369ef --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopInputStream.java @@ -0,0 +1,626 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.hadoop.fs.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * GGFS input stream wrapper for hadoop interfaces. + */ +@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") +public final class IgfsHadoopInputStream extends InputStream implements Seekable, PositionedReadable, + IgfsHadoopStreamEventListener { + /** Minimum buffer size. */ + private static final int MIN_BUF_SIZE = 4 * 1024; + + /** Server stream delegate. */ + private IgfsHadoopStreamDelegate delegate; + + /** Stream ID used by logger. */ + private long logStreamId; + + /** Stream position. */ + private long pos; + + /** Stream read limit. */ + private long limit; + + /** Mark position. */ + private long markPos = -1; + + /** Prefetch buffer. */ + private DoubleFetchBuffer buf = new DoubleFetchBuffer(); + + /** Buffer half size for double-buffering. */ + private int bufHalfSize; + + /** Closed flag. */ + private volatile boolean closed; + + /** Flag set if stream was closed due to connection breakage. */ + private boolean connBroken; + + /** Logger. */ + private Log log; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Read time. */ + private long readTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of read bytes. */ + private long total; + + /** + * Creates input stream. + * + * @param delegate Server stream delegate. + * @param limit Read limit. + * @param bufSize Buffer size. + * @param log Log. + * @param clientLog Client logger. + */ + public IgfsHadoopInputStream(IgfsHadoopStreamDelegate delegate, long limit, int bufSize, Log log, + IgfsLogger clientLog, long logStreamId) { + assert limit >= 0; + + this.delegate = delegate; + this.limit = limit; + this.log = log; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + bufHalfSize = Math.max(bufSize, MIN_BUF_SIZE); + + lastTs = System.nanoTime(); + + delegate.hadoop().addEventListener(delegate, this); + } + + /** + * Read start. + */ + private void readStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void readEnd() { + long now = System.nanoTime(); + + readTime += now - lastTs; + + lastTs = now; + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + checkClosed(); + + readStart(); + + try { + if (eof()) + return -1; + + buf.refreshAhead(pos); + + int res = buf.atPosition(pos); + + pos++; + total++; + + buf.refreshAhead(pos); + + return res; + } + catch (IgniteCheckedException e) { + throw IgfsHadoopUtils.cast(e); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized int read(@NotNull byte[] b, int off, int len) throws IOException { + checkClosed(); + + if (eof()) + return -1; + + readStart(); + + try { + long remaining = limit - pos; + + int read = buf.flatten(b, pos, off, len); + + pos += read; + total += read; + remaining -= read; + + if (remaining > 0 && read != len) { + int readAmt = (int)Math.min(remaining, len - read); + + delegate.hadoop().readData(delegate, pos, readAmt, b, off + read, len - read).get(); + + read += readAmt; + pos += readAmt; + total += readAmt; + } + + buf.refreshAhead(pos); + + return read; + } + catch (IgniteCheckedException e) { + throw IgfsHadoopUtils.cast(e); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized long skip(long n) throws IOException { + checkClosed(); + + if (clientLog.isLogEnabled()) + clientLog.logSkip(logStreamId, n); + + long oldPos = pos; + + if (pos + n <= limit) + pos += n; + else + pos = limit; + + buf.refreshAhead(pos); + + return pos - oldPos; + } + + /** {@inheritDoc} */ + @Override public synchronized int available() throws IOException { + checkClosed(); + + int available = buf.available(pos); + + assert available >= 0; + + return available; + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + readStart(); + + if (log.isDebugEnabled()) + log.debug("Closing input stream: " + delegate); + + delegate.hadoop().closeStream(delegate); + + readEnd(); + + if (clientLog.isLogEnabled()) + clientLog.logCloseIn(logStreamId, userTime, readTime, total); + + markClosed(false); + + if (log.isDebugEnabled()) + log.debug("Closed stream [delegate=" + delegate + ", readTime=" + readTime + + ", userTime=" + userTime + ']'); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void mark(int readLimit) { + markPos = pos; + + if (clientLog.isLogEnabled()) + clientLog.logMark(logStreamId, readLimit); + } + + /** {@inheritDoc} */ + @Override public synchronized void reset() throws IOException { + checkClosed(); + + if (clientLog.isLogEnabled()) + clientLog.logReset(logStreamId); + + if (markPos == -1) + throw new IOException("Stream was not marked."); + + pos = markPos; + + buf.refreshAhead(pos); + } + + /** {@inheritDoc} */ + @Override public boolean markSupported() { + return true; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long position, byte[] buf, int off, int len) throws IOException { + long remaining = limit - position; + + int read = (int)Math.min(len, remaining); + + // Return -1 at EOF. + if (read == 0) + return -1; + + readFully(position, buf, off, read); + + return read; + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long position, byte[] buf, int off, int len) throws IOException { + long remaining = limit - position; + + checkClosed(); + + if (len > remaining) + throw new EOFException("End of stream reached before data was fully read."); + + readStart(); + + try { + int read = this.buf.flatten(buf, position, off, len); + + total += read; + + if (read != len) { + int readAmt = len - read; + + delegate.hadoop().readData(delegate, position + read, readAmt, buf, off + read, readAmt).get(); + + total += readAmt; + } + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, position, len); + } + catch (IgniteCheckedException e) { + throw IgfsHadoopUtils.cast(e); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public void readFully(long position, byte[] buf) throws IOException { + readFully(position, buf, 0, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + A.ensure(pos >= 0, "position must be non-negative"); + + checkClosed(); + + if (clientLog.isLogEnabled()) + clientLog.logSeek(logStreamId, pos); + + if (pos > limit) + pos = limit; + + if (log.isDebugEnabled()) + log.debug("Seek to position [delegate=" + delegate + ", pos=" + pos + ", oldPos=" + this.pos + ']'); + + this.pos = pos; + + buf.refreshAhead(pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() { + return pos; + } + + /** {@inheritDoc} */ + @Override public synchronized boolean seekToNewSource(long targetPos) { + return false; + } + + /** {@inheritDoc} */ + @Override public void onClose() { + markClosed(true); + } + + /** {@inheritDoc} */ + @Override public void onError(String errMsg) { + // No-op. + } + + /** + * Marks stream as closed. + * + * @param connBroken {@code True} if connection with server was lost. + */ + private void markClosed(boolean connBroken) { + // It is ok to have race here. + if (!closed) { + closed = true; + + this.connBroken = connBroken; + + delegate.hadoop().removeEventListener(delegate); + } + } + + /** + * @throws IOException If check failed. + */ + private void checkClosed() throws IOException { + if (closed) { + if (connBroken) + throw new IOException("Server connection was lost."); + else + throw new IOException("Stream is closed."); + } + } + + /** + * @return {@code True} if end of stream reached. + */ + private boolean eof() { + return limit == pos; + } + + /** + * Asynchronous prefetch buffer. + */ + private static class FetchBufferPart { + /** Read future. */ + private GridPlainFuture<byte[]> readFut; + + /** Position of cached chunk in file. */ + private long pos; + + /** Prefetch length. Need to store as read future result might be not available yet. */ + private int len; + + /** + * Creates fetch buffer part. + * + * @param readFut Read future for this buffer. + * @param pos Read position. + * @param len Chunk length. + */ + private FetchBufferPart(GridPlainFuture<byte[]> readFut, long pos, int len) { + this.readFut = readFut; + this.pos = pos; + this.len = len; + } + + /** + * Copies cached data if specified position matches cached region. + * + * @param dst Destination buffer. + * @param pos Read position in file. + * @param dstOff Offset in destination buffer from which start writing. + * @param len Maximum number of bytes to copy. + * @return Number of bytes copied. + * @throws IgniteCheckedException If read future failed. + */ + public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { + // If read start position is within cached boundaries. + if (contains(pos)) { + byte[] data = readFut.get(); + + int srcPos = (int)(pos - this.pos); + int cpLen = Math.min(len, data.length - srcPos); + + U.arrayCopy(data, srcPos, dst, dstOff, cpLen); + + return cpLen; + } + + return 0; + } + + /** + * @return {@code True} if data is ready to be read. + */ + public boolean ready() { + return readFut.isDone(); + } + + /** + * Checks if current buffer part contains given position. + * + * @param pos Position to check. + * @return {@code True} if position matches buffer region. + */ + public boolean contains(long pos) { + return this.pos <= pos && this.pos + len > pos; + } + } + + private class DoubleFetchBuffer { + /** */ + private FetchBufferPart first; + + /** */ + private FetchBufferPart second; + + /** + * Copies fetched data from both buffers to destination array if cached region matched read position. + * + * @param dst Destination buffer. + * @param pos Read position in file. + * @param dstOff Destination buffer offset. + * @param len Maximum number of bytes to copy. + * @return Number of bytes copied. + * @throws IgniteCheckedException If any read operation failed. + */ + public int flatten(byte[] dst, long pos, int dstOff, int len) throws IgniteCheckedException { + assert dstOff >= 0; + assert dstOff + len <= dst.length : "Invalid indices [dst.length=" + dst.length + ", dstOff=" + dstOff + + ", len=" + len + ']'; + + int bytesCopied = 0; + + if (first != null) { + bytesCopied += first.flatten(dst, pos, dstOff, len); + + if (bytesCopied != len && second != null) { + assert second.pos == first.pos + first.len; + + bytesCopied += second.flatten(dst, pos + bytesCopied, dstOff + bytesCopied, len - bytesCopied); + } + } + + return bytesCopied; + } + + /** + * Gets byte at specified position in buffer. + * + * @param pos Stream position. + * @return Read byte. + * @throws IgniteCheckedException If read failed. + */ + public int atPosition(long pos) throws IgniteCheckedException { + // Should not reach here if stream contains no data. + assert first != null; + + if (first.contains(pos)) { + byte[] bytes = first.readFut.get(); + + return bytes[((int)(pos - first.pos))] & 0xFF; + } + else { + assert second != null; + assert second.contains(pos); + + byte[] bytes = second.readFut.get(); + + return bytes[((int)(pos - second.pos))] & 0xFF; + } + } + + /** + * Starts asynchronous buffer refresh if needed, depending on current position. + * + * @param pos Current stream position. + */ + public void refreshAhead(long pos) { + if (fullPrefetch(pos)) { + first = fetch(pos, bufHalfSize); + second = fetch(pos + bufHalfSize, bufHalfSize); + } + else if (needFlip(pos)) { + first = second; + + second = fetch(first.pos + first.len, bufHalfSize); + } + } + + /** + * @param pos Position from which read is expected. + * @return Number of bytes available to be read without blocking. + */ + public int available(long pos) { + int available = 0; + + if (first != null) { + if (first.contains(pos)) { + if (first.ready()) { + available += (pos - first.pos); + + if (second != null && second.ready()) + available += second.len; + } + } + else { + if (second != null && second.contains(pos) && second.ready()) + available += (pos - second.pos); + } + } + + return available; + } + + /** + * Checks if position shifted enough to forget previous buffer. + * + * @param pos Current position. + * @return {@code True} if need flip buffers. + */ + private boolean needFlip(long pos) { + // Return true if we read more then half of second buffer. + return second != null && second.contains(pos); + } + + /** + * Determines if all cached bytes should be discarded and new region should be + * prefetched. + * + * @param curPos Current stream position. + * @return {@code True} if need to refresh both blocks. + */ + private boolean fullPrefetch(long curPos) { + // If no data was prefetched yet, return true. + return first == null || curPos < first.pos || (second != null && curPos >= second.pos + second.len); + } + + /** + * Starts asynchronous fetch for given region. + * + * @param pos Position to read from. + * @param size Number of bytes to read. + * @return Fetch buffer part. + */ + private FetchBufferPart fetch(long pos, int size) { + long remaining = limit - pos; + + size = (int)Math.min(size, remaining); + + return size <= 0 ? null : + new FetchBufferPart(delegate.hadoop().readData(delegate, pos, size, null, 0, 0), pos, size); + } + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java new file mode 100644 index 0000000..7f87e10 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIo.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.ignite.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.util.lang.*; +import org.jetbrains.annotations.*; + +/** + * IO abstraction layer for GGFS client. Two kind of messages are expected to be sent: requests with response + * and request without response. + */ +public interface IgfsHadoopIo { + /** + * Sends given GGFS client message and asynchronously awaits for response. + * + * @param msg Message to send. + * @return Future that will be completed. + * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). + */ + public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException; + + /** + * Sends given GGFS client message and asynchronously awaits for response. When IO detects response + * beginning for given message it stops reading data and passes input stream to closure which can read + * response in a specific way. + * + * @param msg Message to send. + * @param outBuf Output buffer. If {@code null}, the output buffer is not used. + * @param outOff Output buffer offset. + * @param outLen Output buffer length. + * @return Future that will be completed when response is returned from closure. + * @throws IgniteCheckedException If a message cannot be sent (connection is broken or client was closed). + */ + public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, int outLen) + throws IgniteCheckedException; + + /** + * Sends given message and does not wait for response. + * + * @param msg Message to send. + * @throws IgniteCheckedException If send failed. + */ + public void sendPlain(IgfsMessage msg) throws IgniteCheckedException; + + /** + * Adds event listener that will be invoked when connection with server is lost or remote error has occurred. + * If connection is closed already, callback will be invoked synchronously inside this method. + * + * @param lsnr Event listener. + */ + public void addEventListener(IgfsHadoopIpcIoListener lsnr); + + /** + * Removes event listener that will be invoked when connection with server is lost or remote error has occurred. + * + * @param lsnr Event listener. + */ + public void removeEventListener(IgfsHadoopIpcIoListener lsnr); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIo.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIo.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIo.java new file mode 100644 index 0000000..5e2d03b --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIo.java @@ -0,0 +1,599 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.util.*; +import org.apache.ignite.internal.util.ipc.*; +import org.apache.ignite.internal.util.ipc.shmem.*; +import org.apache.ignite.internal.util.lang.*; +import org.apache.ignite.internal.util.typedef.internal.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; +import java.util.concurrent.locks.*; + +/** + * IO layer implementation based on blocking IPC streams. + */ +@SuppressWarnings("FieldAccessedSynchronizedAndUnsynchronized") +public class IgfsHadoopIpcIo implements IgfsHadoopIo { + /** Logger. */ + private Log log; + + /** Request futures map. */ + private ConcurrentMap<Long, IgfsHadoopFuture> reqMap = + new ConcurrentHashMap8<>(); + + /** Request ID counter. */ + private AtomicLong reqIdCnt = new AtomicLong(); + + /** Endpoint. */ + private IpcEndpoint endpoint; + + /** Endpoint output stream. */ + private IgfsDataOutputStream out; + + /** Protocol. */ + private final IgfsMarshaller marsh; + + /** Client reader thread. */ + private Thread reader; + + /** Lock for graceful shutdown. */ + private final ReadWriteLock busyLock = new ReentrantReadWriteLock(); + + /** Stopping flag. */ + private volatile boolean stopping; + + /** Server endpoint address. */ + private final String endpointAddr; + + /** Number of open file system sessions. */ + private final AtomicInteger activeCnt = new AtomicInteger(1); + + /** Event listeners. */ + private final Collection<IgfsHadoopIpcIoListener> lsnrs = + new GridConcurrentHashSet<>(); + + /** Cached connections. */ + private static final ConcurrentMap<String, IgfsHadoopIpcIo> ipcCache = + new ConcurrentHashMap8<>(); + + /** Striped lock that prevents multiple instance creation in {@link #get(Log, String)}. */ + private static final GridStripedLock initLock = new GridStripedLock(32); + + /** + * @param endpointAddr Endpoint. + * @param marsh Protocol. + * @param log Logger to use. + */ + public IgfsHadoopIpcIo(String endpointAddr, IgfsMarshaller marsh, Log log) { + assert endpointAddr != null; + assert marsh != null; + + this.endpointAddr = endpointAddr; + this.marsh = marsh; + this.log = log; + } + + /** + * Returns a started and valid instance of this class + * for a given endpoint. + * + * @param log Logger to use for new instance. + * @param endpoint Endpoint string. + * @return New or existing cached instance, which is started and operational. + * @throws IOException If new instance was created but failed to start. + */ + public static IgfsHadoopIpcIo get(Log log, String endpoint) throws IOException { + while (true) { + IgfsHadoopIpcIo clientIo = ipcCache.get(endpoint); + + if (clientIo != null) { + if (clientIo.acquire()) + return clientIo; + else + // If concurrent close. + ipcCache.remove(endpoint, clientIo); + } + else { + Lock lock = initLock.getLock(endpoint); + + lock.lock(); + + try { + clientIo = ipcCache.get(endpoint); + + if (clientIo != null) { // Perform double check. + if (clientIo.acquire()) + return clientIo; + else + // If concurrent close. + ipcCache.remove(endpoint, clientIo); + } + + // Otherwise try creating a new one. + clientIo = new IgfsHadoopIpcIo(endpoint, new IgfsMarshaller(), log); + + try { + clientIo.start(); + } + catch (IgniteCheckedException e) { + throw new IOException(e.getMessage(), e); + } + + IgfsHadoopIpcIo old = ipcCache.putIfAbsent(endpoint, clientIo); + + // Put in exclusive lock. + assert old == null; + + return clientIo; + } + finally { + lock.unlock(); + } + } + } + } + + /** + * Increases usage count for this instance. + * + * @return {@code true} if usage count is greater than zero. + */ + private boolean acquire() { + while (true) { + int cnt = activeCnt.get(); + + if (cnt == 0) { + if (log.isDebugEnabled()) + log.debug("IPC IO not acquired (count was 0): " + this); + + return false; + } + + // Need to make sure that no-one decremented count in between. + if (activeCnt.compareAndSet(cnt, cnt + 1)) { + if (log.isDebugEnabled()) + log.debug("IPC IO acquired: " + this); + + return true; + } + } + } + + /** + * Releases this instance, decrementing usage count. + * <p> + * If usage count becomes zero, the instance is stopped + * and removed from cache. + */ + public void release() { + while (true) { + int cnt = activeCnt.get(); + + if (cnt == 0) { + if (log.isDebugEnabled()) + log.debug("IPC IO not released (count was 0): " + this); + + return; + } + + if (activeCnt.compareAndSet(cnt, cnt - 1)) { + if (cnt == 1) { + ipcCache.remove(endpointAddr, this); + + if (log.isDebugEnabled()) + log.debug("IPC IO stopping as unused: " + this); + + stop(); + } + else if (log.isDebugEnabled()) + log.debug("IPC IO released: " + this); + + return; + } + } + } + + /** + * Closes this IO instance, removing it from cache. + */ + public void forceClose() { + if (ipcCache.remove(endpointAddr, this)) + stop(); + } + + /** + * Starts the IO. + * + * @throws IgniteCheckedException If failed to connect the endpoint. + */ + private void start() throws IgniteCheckedException { + boolean success = false; + + try { + endpoint = IpcEndpointFactory.connectEndpoint( + endpointAddr, new GridLoggerProxy(new IgfsHadoopJclLogger(log), null, null, "")); + + out = new IgfsDataOutputStream(new BufferedOutputStream(endpoint.outputStream())); + + reader = new ReaderThread(); + + // Required for Hadoop 2.x + reader.setDaemon(true); + + reader.start(); + + success = true; + } + catch (IgniteCheckedException e) { + IpcOutOfSystemResourcesException resEx = e.getCause(IpcOutOfSystemResourcesException.class); + + if (resEx != null) + throw new IgniteCheckedException(IpcSharedMemoryServerEndpoint.OUT_OF_RESOURCES_MSG, resEx); + + throw e; + } + finally { + if (!success) + stop(); + } + } + + /** + * Shuts down the IO. No send requests will be accepted anymore, all pending futures will be failed. + * Close listeners will be invoked as if connection is closed by server. + */ + private void stop() { + close0(null); + + if (reader != null) { + try { + U.interrupt(reader); + U.join(reader); + + reader = null; + } + catch (IgniteInterruptedCheckedException ignored) { + Thread.currentThread().interrupt(); + + log.warn("Got interrupted while waiting for reader thread to shut down (will return)."); + } + } + } + + /** {@inheritDoc} */ + @Override public void addEventListener(IgfsHadoopIpcIoListener lsnr) { + if (!busyLock.readLock().tryLock()) { + lsnr.onClose(); + + return; + } + + boolean invokeNow = false; + + try { + invokeNow = stopping; + + if (!invokeNow) + lsnrs.add(lsnr); + } + finally { + busyLock.readLock().unlock(); + + if (invokeNow) + lsnr.onClose(); + } + } + + /** {@inheritDoc} */ + @Override public void removeEventListener(IgfsHadoopIpcIoListener lsnr) { + lsnrs.remove(lsnr); + } + + /** {@inheritDoc} */ + @Override public GridPlainFuture<IgfsMessage> send(IgfsMessage msg) throws IgniteCheckedException { + return send(msg, null, 0, 0); + } + + /** {@inheritDoc} */ + @Override public <T> GridPlainFuture<T> send(IgfsMessage msg, @Nullable byte[] outBuf, int outOff, + int outLen) throws IgniteCheckedException { + assert outBuf == null || msg.command() == IgfsIpcCommand.READ_BLOCK; + + if (!busyLock.readLock().tryLock()) + throw new IgfsHadoopCommunicationException("Failed to send message (client is being concurrently " + + "closed)."); + + try { + if (stopping) + throw new IgfsHadoopCommunicationException("Failed to send message (client is being concurrently " + + "closed)."); + + long reqId = reqIdCnt.getAndIncrement(); + + IgfsHadoopFuture<T> fut = new IgfsHadoopFuture<>(); + + fut.outputBuffer(outBuf); + fut.outputOffset(outOff); + fut.outputLength(outLen); + fut.read(msg.command() == IgfsIpcCommand.READ_BLOCK); + + IgfsHadoopFuture oldFut = reqMap.putIfAbsent(reqId, fut); + + assert oldFut == null; + + if (log.isDebugEnabled()) + log.debug("Sending GGFS message [reqId=" + reqId + ", msg=" + msg + ']'); + + byte[] hdr = IgfsMarshaller.createHeader(reqId, msg.command()); + + IgniteCheckedException err = null; + + try { + synchronized (this) { + marsh.marshall(msg, hdr, out); + + out.flush(); // Blocking operation + sometimes system call. + } + } + catch (IgniteCheckedException e) { + err = e; + } + catch (IOException e) { + err = new IgfsHadoopCommunicationException(e); + } + + if (err != null) { + reqMap.remove(reqId, fut); + + fut.onDone(err); + } + + return fut; + } + finally { + busyLock.readLock().unlock(); + } + } + + /** {@inheritDoc} */ + @Override public void sendPlain(IgfsMessage msg) throws IgniteCheckedException { + if (!busyLock.readLock().tryLock()) + throw new IgfsHadoopCommunicationException("Failed to send message (client is being " + + "concurrently closed)."); + + try { + if (stopping) + throw new IgfsHadoopCommunicationException("Failed to send message (client is being concurrently closed)."); + + assert msg.command() == IgfsIpcCommand.WRITE_BLOCK; + + IgfsStreamControlRequest req = (IgfsStreamControlRequest)msg; + + byte[] hdr = IgfsMarshaller.createHeader(-1, IgfsIpcCommand.WRITE_BLOCK); + + U.longToBytes(req.streamId(), hdr, 12); + U.intToBytes(req.length(), hdr, 20); + + synchronized (this) { + out.write(hdr); + out.write(req.data(), (int)req.position(), req.length()); + + out.flush(); + } + } + catch (IOException e) { + throw new IgfsHadoopCommunicationException(e); + } + finally { + busyLock.readLock().unlock(); + } + } + + /** + * Closes client but does not wait. + * + * @param err Error. + */ + private void close0(@Nullable Throwable err) { + busyLock.writeLock().lock(); + + try { + if (stopping) + return; + + stopping = true; + } + finally { + busyLock.writeLock().unlock(); + } + + if (err == null) + err = new IgniteCheckedException("Failed to perform request (connection was concurrently closed before response " + + "is received)."); + + // Clean up resources. + U.closeQuiet(out); + + if (endpoint != null) + endpoint.close(); + + // Unwind futures. We can safely iterate here because no more futures will be added. + Iterator<IgfsHadoopFuture> it = reqMap.values().iterator(); + + while (it.hasNext()) { + IgfsHadoopFuture fut = it.next(); + + fut.onDone(err); + + it.remove(); + } + + for (IgfsHadoopIpcIoListener lsnr : lsnrs) + lsnr.onClose(); + } + + /** + * Do not extend {@code GridThread} to minimize class dependencies. + */ + private class ReaderThread extends Thread { + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + @Override public void run() { + // Error to fail pending futures. + Throwable err = null; + + try { + InputStream in = endpoint.inputStream(); + + IgfsDataInputStream dis = new IgfsDataInputStream(in); + + byte[] hdr = new byte[IgfsMarshaller.HEADER_SIZE]; + byte[] msgHdr = new byte[IgfsControlResponse.RES_HEADER_SIZE]; + + while (!Thread.currentThread().isInterrupted()) { + dis.readFully(hdr); + + long reqId = U.bytesToLong(hdr, 0); + + // We don't wait for write responses, therefore reqId is -1. + if (reqId == -1) { + // We received a response which normally should not be sent. It must contain an error. + dis.readFully(msgHdr); + + assert msgHdr[4] != 0; + + String errMsg = dis.readUTF(); + + // Error code. + dis.readInt(); + + long streamId = dis.readLong(); + + for (IgfsHadoopIpcIoListener lsnr : lsnrs) + lsnr.onError(streamId, errMsg); + } + else { + IgfsHadoopFuture<Object> fut = reqMap.remove(reqId); + + if (fut == null) { + String msg = "Failed to read response from server: response closure is unavailable for " + + "requestId (will close connection):" + reqId; + + log.warn(msg); + + err = new IgniteCheckedException(msg); + + break; + } + else { + try { + IgfsIpcCommand cmd = IgfsIpcCommand.valueOf(U.bytesToInt(hdr, 8)); + + if (log.isDebugEnabled()) + log.debug("Received GGFS response [reqId=" + reqId + ", cmd=" + cmd + ']'); + + Object res = null; + + if (fut.read()) { + dis.readFully(msgHdr); + + boolean hasErr = msgHdr[4] != 0; + + if (hasErr) { + String errMsg = dis.readUTF(); + + // Error code. + Integer errCode = dis.readInt(); + + IgfsControlResponse.throwError(errCode, errMsg); + } + + int blockLen = U.bytesToInt(msgHdr, 5); + + int readLen = Math.min(blockLen, fut.outputLength()); + + if (readLen > 0) { + assert fut.outputBuffer() != null; + + dis.readFully(fut.outputBuffer(), fut.outputOffset(), readLen); + } + + if (readLen != blockLen) { + byte[] buf = new byte[blockLen - readLen]; + + dis.readFully(buf); + + res = buf; + } + } + else + res = marsh.unmarshall(cmd, hdr, dis); + + fut.onDone(res); + } + catch (IgniteCheckedException e) { + if (log.isDebugEnabled()) + log.debug("Failed to apply response closure (will fail request future): " + + e.getMessage()); + + fut.onDone(e); + + err = e; + } + } + } + } + } + catch (EOFException ignored) { + err = new IgniteCheckedException("Failed to read response from server (connection was closed by remote peer)."); + } + catch (IOException e) { + if (!stopping) + log.error("Failed to read data (connection will be closed)", e); + + err = new IgfsHadoopCommunicationException(e); + } + catch (IgniteCheckedException e) { + if (!stopping) + log.error("Failed to obtain endpoint input stream (connection will be closed)", e); + + err = e; + } + finally { + close0(err); + } + } + } + + /** {@inheritDoc} */ + @Override public String toString() { + return getClass().getSimpleName() + " [endpointAddr=" + endpointAddr + ", activeCnt=" + activeCnt + + ", stopping=" + stopping + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIoListener.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIoListener.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIoListener.java new file mode 100644 index 0000000..ffc58ba --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopIpcIoListener.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +/** + * Listens to the events of {@link IgfsHadoopIpcIo}. + */ +public interface IgfsHadoopIpcIoListener { + /** + * Callback invoked when the IO is being closed. + */ + public void onClose(); + + /** + * Callback invoked when remote error occurs. + * + * @param streamId Stream ID. + * @param errMsg Error message. + */ + public void onError(long streamId, String errMsg); +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopJclLogger.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopJclLogger.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopJclLogger.java new file mode 100644 index 0000000..344ba8f --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopJclLogger.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.jetbrains.annotations.*; + +/** + * JCL logger wrapper for Hadoop. + */ +public class IgfsHadoopJclLogger implements IgniteLogger { + /** */ + private static final long serialVersionUID = 0L; + + /** JCL implementation proxy. */ + private Log impl; + + /** + * Constructor. + * + * @param impl JCL implementation to use. + */ + IgfsHadoopJclLogger(Log impl) { + assert impl != null; + + this.impl = impl; + } + + /** {@inheritDoc} */ + @Override public IgniteLogger getLogger(Object ctgr) { + return new IgfsHadoopJclLogger(LogFactory.getLog( + ctgr instanceof Class ? ((Class)ctgr).getName() : String.valueOf(ctgr))); + } + + /** {@inheritDoc} */ + @Override public void trace(String msg) { + impl.trace(msg); + } + + /** {@inheritDoc} */ + @Override public void debug(String msg) { + impl.debug(msg); + } + + /** {@inheritDoc} */ + @Override public void info(String msg) { + impl.info(msg); + } + + /** {@inheritDoc} */ + @Override public void warning(String msg) { + impl.warn(msg); + } + + /** {@inheritDoc} */ + @Override public void warning(String msg, @Nullable Throwable e) { + impl.warn(msg, e); + } + + /** {@inheritDoc} */ + @Override public void error(String msg) { + impl.error(msg); + } + + /** {@inheritDoc} */ + @Override public boolean isQuiet() { + return !isInfoEnabled() && !isDebugEnabled(); + } + + /** {@inheritDoc} */ + @Override public void error(String msg, @Nullable Throwable e) { + impl.error(msg, e); + } + + /** {@inheritDoc} */ + @Override public boolean isTraceEnabled() { + return impl.isTraceEnabled(); + } + + /** {@inheritDoc} */ + @Override public boolean isDebugEnabled() { + return impl.isDebugEnabled(); + } + + /** {@inheritDoc} */ + @Override public boolean isInfoEnabled() { + return impl.isInfoEnabled(); + } + + /** {@inheritDoc} */ + @Nullable @Override public String fileName() { + return null; + } + + /** {@inheritDoc} */ + @Override public String toString() { + return "GridGgfsHadoopJclLogger [impl=" + impl + ']'; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutProc.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutProc.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutProc.java new file mode 100644 index 0000000..a88b8d0 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutProc.java @@ -0,0 +1,466 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.igfs.common.*; +import org.apache.ignite.internal.processors.fs.*; +import org.apache.ignite.internal.util.lang.*; +import org.jdk8.backport.*; +import org.jetbrains.annotations.*; + +import java.io.*; +import java.util.*; + +import static org.apache.ignite.internal.igfs.common.IgfsIpcCommand.*; + +/** + * Communication with external process (TCP or shmem). + */ +public class IgfsHadoopOutProc implements IgfsHadoopEx, IgfsHadoopIpcIoListener { + /** Expected result is boolean. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Boolean> BOOL_RES = createClosure(); + + /** Expected result is boolean. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, Long> LONG_RES = createClosure(); + + /** Expected result is {@code GridGgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsFile> FILE_RES = createClosure(); + + /** Expected result is {@code GridGgfsHandshakeResponse} */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + IgfsHandshakeResponse> HANDSHAKE_RES = createClosure(); + + /** Expected result is {@code GridGgfsStatus} */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsStatus> STATUS_RES = + createClosure(); + + /** Expected result is {@code GridGgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + IgfsInputStreamDescriptor> STREAM_DESCRIPTOR_RES = createClosure(); + + /** Expected result is {@code GridGgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + Collection<IgfsFile>> FILE_COL_RES = createClosure(); + + /** Expected result is {@code GridGgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + Collection<IgfsPath>> PATH_COL_RES = createClosure(); + + /** Expected result is {@code GridGgfsPathSummary}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, IgfsPathSummary> SUMMARY_RES = + createClosure(); + + /** Expected result is {@code GridGgfsFile}. */ + private static final GridPlainClosure<GridPlainFuture<IgfsMessage>, + Collection<IgfsBlockLocation>> BLOCK_LOCATION_COL_RES = createClosure(); + + /** Grid name. */ + private final String grid; + + /** GGFS name. */ + private final String ggfs; + + /** Client log. */ + private final Log log; + + /** Client IO. */ + private final IgfsHadoopIpcIo io; + + /** Event listeners. */ + private final Map<Long, IgfsHadoopStreamEventListener> lsnrs = new ConcurrentHashMap8<>(); + + /** + * Constructor for TCP endpoint. + * + * @param host Host. + * @param port Port. + * @param grid Grid name. + * @param ggfs GGFS name. + * @param log Client logger. + * @throws IOException If failed. + */ + public IgfsHadoopOutProc(String host, int port, String grid, String ggfs, Log log) throws IOException { + this(host, port, grid, ggfs, false, log); + } + + /** + * Constructor for shmem endpoint. + * + * @param port Port. + * @param grid Grid name. + * @param ggfs GGFS name. + * @param log Client logger. + * @throws IOException If failed. + */ + public IgfsHadoopOutProc(int port, String grid, String ggfs, Log log) throws IOException { + this(null, port, grid, ggfs, true, log); + } + + /** + * Constructor. + * + * @param host Host. + * @param port Port. + * @param grid Grid name. + * @param ggfs GGFS name. + * @param shmem Shared memory flag. + * @param log Client logger. + * @throws IOException If failed. + */ + private IgfsHadoopOutProc(String host, int port, String grid, String ggfs, boolean shmem, Log log) + throws IOException { + assert host != null && !shmem || host == null && shmem : + "Invalid arguments [host=" + host + ", port=" + port + ", shmem=" + shmem + ']'; + + String endpoint = host != null ? host + ":" + port : "shmem:" + port; + + this.grid = grid; + this.ggfs = ggfs; + this.log = log; + + io = IgfsHadoopIpcIo.get(log, endpoint); + + io.addEventListener(this); + } + + /** {@inheritDoc} */ + @Override public IgfsHandshakeResponse handshake(String logDir) throws IgniteCheckedException { + final IgfsHandshakeRequest req = new IgfsHandshakeRequest(); + + req.gridName(grid); + req.ggfsName(ggfs); + req.logDirectory(logDir); + + return io.send(req).chain(HANDSHAKE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public void close(boolean force) { + assert io != null; + + io.removeEventListener(this); + + if (force) + io.forceClose(); + else + io.release(); + } + + /** {@inheritDoc} */ + @Override public IgfsFile info(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(INFO); + msg.path(path); + + return io.send(msg).chain(FILE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(UPDATE); + msg.path(path); + msg.properties(props); + + return io.send(msg).chain(FILE_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(SET_TIMES); + msg.path(path); + msg.accessTime(accessTime); + msg.modificationTime(modificationTime); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean rename(IgfsPath src, IgfsPath dest) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(RENAME); + msg.path(src); + msg.destinationPath(dest); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean delete(IgfsPath path, boolean recursive) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(DELETE); + msg.path(path); + msg.flag(recursive); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) + throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(AFFINITY); + msg.path(path); + msg.start(start); + msg.length(len); + + return io.send(msg).chain(BLOCK_LOCATION_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsPathSummary contentSummary(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(PATH_SUMMARY); + msg.path(path); + + return io.send(msg).chain(SUMMARY_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Boolean mkdirs(IgfsPath path, Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(MAKE_DIRECTORIES); + msg.path(path); + msg.properties(props); + + return io.send(msg).chain(BOOL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(LIST_FILES); + msg.path(path); + + return io.send(msg).chain(FILE_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(LIST_PATHS); + msg.path(path); + + return io.send(msg).chain(PATH_COL_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsStatus fsStatus() throws IgniteCheckedException { + return io.send(new IgfsStatusRequest()).chain(STATUS_RES).get(); + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate open(IgfsPath path) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_READ); + msg.path(path); + msg.flag(false); + + IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); + + return new IgfsHadoopStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate open(IgfsPath path, + int seqReadsBeforePrefetch) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_READ); + msg.path(path); + msg.flag(true); + msg.sequentialReadsBeforePrefetch(seqReadsBeforePrefetch); + + IgfsInputStreamDescriptor rmtDesc = io.send(msg).chain(STREAM_DESCRIPTOR_RES).get(); + + return new IgfsHadoopStreamDelegate(this, rmtDesc.streamId(), rmtDesc.length()); + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate create(IgfsPath path, boolean overwrite, boolean colocate, + int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_CREATE); + msg.path(path); + msg.flag(overwrite); + msg.colocate(colocate); + msg.properties(props); + msg.replication(replication); + msg.blockSize(blockSize); + + Long streamId = io.send(msg).chain(LONG_RES).get(); + + return new IgfsHadoopStreamDelegate(this, streamId); + } + + /** {@inheritDoc} */ + @Override public IgfsHadoopStreamDelegate append(IgfsPath path, boolean create, + @Nullable Map<String, String> props) throws IgniteCheckedException { + final IgfsPathControlRequest msg = new IgfsPathControlRequest(); + + msg.command(OPEN_APPEND); + msg.path(path); + msg.flag(create); + msg.properties(props); + + Long streamId = io.send(msg).chain(LONG_RES).get(); + + return new IgfsHadoopStreamDelegate(this, streamId); + } + + /** {@inheritDoc} */ + @Override public GridPlainFuture<byte[]> readData(IgfsHadoopStreamDelegate desc, long pos, int len, + final @Nullable byte[] outBuf, final int outOff, final int outLen) { + assert len > 0; + + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(READ_BLOCK); + msg.streamId((long) desc.target()); + msg.position(pos); + msg.length(len); + + try { + return io.send(msg, outBuf, outOff, outLen); + } + catch (IgniteCheckedException e) { + return new GridPlainFutureAdapter<>(e); + } + } + + /** {@inheritDoc} */ + @Override public void writeData(IgfsHadoopStreamDelegate desc, byte[] data, int off, int len) + throws IOException { + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(WRITE_BLOCK); + msg.streamId((long) desc.target()); + msg.data(data); + msg.position(off); + msg.length(len); + + try { + io.sendPlain(msg); + } + catch (IgniteCheckedException e) { + throw IgfsHadoopUtils.cast(e); + } + } + + /** {@inheritDoc} */ + @Override public void flush(IgfsHadoopStreamDelegate delegate) throws IOException { + // No-op. + } + + /** {@inheritDoc} */ + @Override public void closeStream(IgfsHadoopStreamDelegate desc) throws IOException { + final IgfsStreamControlRequest msg = new IgfsStreamControlRequest(); + + msg.command(CLOSE); + msg.streamId((long)desc.target()); + + try { + io.send(msg).chain(BOOL_RES).get(); + } + catch (IgniteCheckedException e) { + throw IgfsHadoopUtils.cast(e); + } + } + + /** {@inheritDoc} */ + @Override public void addEventListener(IgfsHadoopStreamDelegate desc, + IgfsHadoopStreamEventListener lsnr) { + long streamId = desc.target(); + + IgfsHadoopStreamEventListener lsnr0 = lsnrs.put(streamId, lsnr); + + assert lsnr0 == null || lsnr0 == lsnr; + + if (log.isDebugEnabled()) + log.debug("Added stream event listener [streamId=" + streamId + ']'); + } + + /** {@inheritDoc} */ + @Override public void removeEventListener(IgfsHadoopStreamDelegate desc) { + long streamId = desc.target(); + + IgfsHadoopStreamEventListener lsnr0 = lsnrs.remove(streamId); + + if (lsnr0 != null && log.isDebugEnabled()) + log.debug("Removed stream event listener [streamId=" + streamId + ']'); + } + + /** {@inheritDoc} */ + @Override public void onClose() { + for (IgfsHadoopStreamEventListener lsnr : lsnrs.values()) { + try { + lsnr.onClose(); + } + catch (IgniteCheckedException e) { + log.warn("Got exception from stream event listener (will ignore): " + lsnr, e); + } + } + } + + /** {@inheritDoc} */ + @Override public void onError(long streamId, String errMsg) { + IgfsHadoopStreamEventListener lsnr = lsnrs.get(streamId); + + if (lsnr != null) + lsnr.onError(errMsg); + else + log.warn("Received write error response for not registered output stream (will ignore) " + + "[streamId= " + streamId + ']'); + } + + /** + * Creates conversion closure for given type. + * + * @param <T> Type of expected result. + * @return Conversion closure. + */ + @SuppressWarnings("unchecked") + private static <T> GridPlainClosure<GridPlainFuture<IgfsMessage>, T> createClosure() { + return new GridPlainClosure<GridPlainFuture<IgfsMessage>, T>() { + @Override public T apply(GridPlainFuture<IgfsMessage> fut) throws IgniteCheckedException { + IgfsControlResponse res = (IgfsControlResponse)fut.get(); + + if (res.hasError()) + res.throwError(); + + return (T)res.response(); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutputStream.java new file mode 100644 index 0000000..6a5bcf1 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopOutputStream.java @@ -0,0 +1,201 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.commons.logging.*; +import org.apache.ignite.*; +import org.apache.ignite.internal.igfs.common.*; +import org.jetbrains.annotations.*; + +import java.io.*; + +/** + * GGFS Hadoop output stream implementation. + */ +public class IgfsHadoopOutputStream extends OutputStream implements IgfsHadoopStreamEventListener { + /** Log instance. */ + private Log log; + + /** Client logger. */ + private IgfsLogger clientLog; + + /** Log stream ID. */ + private long logStreamId; + + /** Server stream delegate. */ + private IgfsHadoopStreamDelegate delegate; + + /** Closed flag. */ + private volatile boolean closed; + + /** Flag set if stream was closed due to connection breakage. */ + private boolean connBroken; + + /** Error message. */ + private volatile String errMsg; + + /** Read time. */ + private long writeTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of written bytes. */ + private long total; + + /** + * Creates light output stream. + * + * @param delegate Server stream delegate. + * @param log Logger to use. + * @param clientLog Client logger. + */ + public IgfsHadoopOutputStream(IgfsHadoopStreamDelegate delegate, Log log, + IgfsLogger clientLog, long logStreamId) { + this.delegate = delegate; + this.log = log; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + + delegate.hadoop().addEventListener(delegate, this); + } + + /** + * Read start. + */ + private void writeStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void writeEnd() { + long now = System.nanoTime(); + + writeTime += now - lastTs; + + lastTs = now; + } + + /** {@inheritDoc} */ + @Override public void write(@NotNull byte[] b, int off, int len) throws IOException { + check(); + + writeStart(); + + try { + delegate.hadoop().writeData(delegate, b, off, len); + + total += len; + } + finally { + writeEnd(); + } + } + + /** {@inheritDoc} */ + @Override public void write(int b) throws IOException { + write(new byte[] {(byte)b}); + + total++; + } + + /** {@inheritDoc} */ + @Override public void flush() throws IOException { + delegate.hadoop().flush(delegate); + } + + /** {@inheritDoc} */ + @Override public void close() throws IOException { + if (!closed) { + if (log.isDebugEnabled()) + log.debug("Closing output stream: " + delegate); + + writeStart(); + + delegate.hadoop().closeStream(delegate); + + markClosed(false); + + writeEnd(); + + if (clientLog.isLogEnabled()) + clientLog.logCloseOut(logStreamId, userTime, writeTime, total); + + if (log.isDebugEnabled()) + log.debug("Closed output stream [delegate=" + delegate + ", writeTime=" + writeTime / 1000 + + ", userTime=" + userTime / 1000 + ']'); + } + else if(connBroken) + throw new IOException( + "Failed to close stream, because connection was broken (data could have been lost)."); + } + + /** + * Marks stream as closed. + * + * @param connBroken {@code True} if connection with server was lost. + */ + private void markClosed(boolean connBroken) { + // It is ok to have race here. + if (!closed) { + closed = true; + + delegate.hadoop().removeEventListener(delegate); + + this.connBroken = connBroken; + } + } + + /** + * @throws IOException If check failed. + */ + private void check() throws IOException { + String errMsg0 = errMsg; + + if (errMsg0 != null) + throw new IOException(errMsg0); + + if (closed) { + if (connBroken) + throw new IOException("Server connection was lost."); + else + throw new IOException("Stream is closed."); + } + } + + /** {@inheritDoc} */ + @Override public void onClose() throws IgniteCheckedException { + markClosed(true); + } + + /** {@inheritDoc} */ + @Override public void onError(String errMsg) { + this.errMsg = errMsg; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyInputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyInputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyInputStream.java new file mode 100644 index 0000000..330537d --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyInputStream.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.ignite.internal.igfs.common.*; + +import java.io.*; + +/** + * Secondary Hadoop file system input stream wrapper. + */ +public class IgfsHadoopProxyInputStream extends InputStream implements Seekable, PositionedReadable { + /** Actual input stream to the secondary file system. */ + private final FSDataInputStream is; + + /** Client logger. */ + private final IgfsLogger clientLog; + + /** Log stream ID. */ + private final long logStreamId; + + /** Read time. */ + private long readTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of read bytes. */ + private long total; + + /** Closed flag. */ + private boolean closed; + + /** + * Constructor. + * + * @param is Actual input stream to the secondary file system. + * @param clientLog Client log. + */ + public IgfsHadoopProxyInputStream(FSDataInputStream is, IgfsLogger clientLog, long logStreamId) { + assert is != null; + assert clientLog != null; + + this.is = is; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b) throws IOException { + readStart(); + + int res; + + try { + res = is.read(b); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(byte[] b, int off, int len) throws IOException { + readStart(); + + int res; + + try { + res = super.read(b, off, len); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized long skip(long n) throws IOException { + readStart(); + + long res; + + try { + res = is.skip(n); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logSkip(logStreamId, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int available() throws IOException { + readStart(); + + try { + return is.available(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + readStart(); + + try { + is.close(); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logCloseIn(logStreamId, userTime, readTime, total); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void mark(int readLimit) { + readStart(); + + try { + is.mark(readLimit); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logMark(logStreamId, readLimit); + } + + /** {@inheritDoc} */ + @Override public synchronized void reset() throws IOException { + readStart(); + + try { + is.reset(); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logReset(logStreamId); + } + + /** {@inheritDoc} */ + @Override public synchronized boolean markSupported() { + readStart(); + + try { + return is.markSupported(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized int read() throws IOException { + readStart(); + + int res; + + try { + res = is.read(); + } + finally { + readEnd(); + } + + if (res != -1) + total++; + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized int read(long pos, byte[] buf, int off, int len) throws IOException { + readStart(); + + int res; + + try { + res = is.read(pos, buf, off, len); + } + finally { + readEnd(); + } + + if (res != -1) + total += res; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, res); + + return res; + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf, int off, int len) throws IOException { + readStart(); + + try { + is.readFully(pos, buf, off, len); + } + finally { + readEnd(); + } + + total += len; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, len); + } + + /** {@inheritDoc} */ + @Override public synchronized void readFully(long pos, byte[] buf) throws IOException { + readStart(); + + try { + is.readFully(pos, buf); + } + finally { + readEnd(); + } + + total += buf.length; + + if (clientLog.isLogEnabled()) + clientLog.logRandomRead(logStreamId, pos, buf.length); + } + + /** {@inheritDoc} */ + @Override public synchronized void seek(long pos) throws IOException { + readStart(); + + try { + is.seek(pos); + } + finally { + readEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logSeek(logStreamId, pos); + } + + /** {@inheritDoc} */ + @Override public synchronized long getPos() throws IOException { + readStart(); + + try { + return is.getPos(); + } + finally { + readEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized boolean seekToNewSource(long targetPos) throws IOException { + readStart(); + + try { + return is.seekToNewSource(targetPos); + } + finally { + readEnd(); + } + } + + /** + * Read start. + */ + private void readStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void readEnd() { + long now = System.nanoTime(); + + readTime += now - lastTs; + + lastTs = now; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyOutputStream.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyOutputStream.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyOutputStream.java new file mode 100644 index 0000000..41e80eb --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopProxyOutputStream.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.ignite.internal.igfs.common.*; + +import java.io.*; + +/** + * Secondary Hadoop file system output stream wrapper. + */ +public class IgfsHadoopProxyOutputStream extends OutputStream { + /** Actual output stream. */ + private FSDataOutputStream os; + + /** Client logger. */ + private final IgfsLogger clientLog; + + /** Log stream ID. */ + private final long logStreamId; + + /** Read time. */ + private long writeTime; + + /** User time. */ + private long userTime; + + /** Last timestamp. */ + private long lastTs; + + /** Amount of written bytes. */ + private long total; + + /** Closed flag. */ + private boolean closed; + + /** + * Constructor. + * + * @param os Actual output stream. + * @param clientLog Client logger. + * @param logStreamId Log stream ID. + */ + public IgfsHadoopProxyOutputStream(FSDataOutputStream os, IgfsLogger clientLog, long logStreamId) { + assert os != null; + assert clientLog != null; + + this.os = os; + this.clientLog = clientLog; + this.logStreamId = logStreamId; + + lastTs = System.nanoTime(); + } + + /** {@inheritDoc} */ + @Override public synchronized void write(int b) throws IOException { + writeStart(); + + try { + os.write(b); + } + finally { + writeEnd(); + } + + total++; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b) throws IOException { + writeStart(); + + try { + os.write(b); + } + finally { + writeEnd(); + } + + total += b.length; + } + + /** {@inheritDoc} */ + @Override public synchronized void write(byte[] b, int off, int len) throws IOException { + writeStart(); + + try { + os.write(b, off, len); + } + finally { + writeEnd(); + } + + total += len; + } + + /** {@inheritDoc} */ + @Override public synchronized void flush() throws IOException { + writeStart(); + + try { + os.flush(); + } + finally { + writeEnd(); + } + } + + /** {@inheritDoc} */ + @Override public synchronized void close() throws IOException { + if (!closed) { + closed = true; + + writeStart(); + + try { + os.close(); + } + finally { + writeEnd(); + } + + if (clientLog.isLogEnabled()) + clientLog.logCloseOut(logStreamId, userTime, writeTime, total); + } + } + + /** + * Read start. + */ + private void writeStart() { + long now = System.nanoTime(); + + userTime += now - lastTs; + + lastTs = now; + } + + /** + * Read end. + */ + private void writeEnd() { + long now = System.nanoTime(); + + writeTime += now - lastTs; + + lastTs = now; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/1428b796/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java ---------------------------------------------------------------------- diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java new file mode 100644 index 0000000..7234269 --- /dev/null +++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/igfs/hadoop/IgfsHadoopReader.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.igfs.hadoop; + +import org.apache.hadoop.fs.*; +import org.apache.hadoop.fs.FileSystem; +import org.apache.ignite.igfs.*; +import org.apache.ignite.internal.util.typedef.internal.*; + +import java.io.*; + +/** + * Secondary file system input stream wrapper which actually opens input stream only in case it is explicitly + * requested. + * <p> + * The class is expected to be used only from synchronized context and therefore is not tread-safe. + */ +public class IgfsHadoopReader implements IgfsReader { + /** Secondary file system. */ + private final FileSystem fs; + + /** Path to the file to open. */ + private final Path path; + + /** Buffer size. */ + private final int bufSize; + + /** Actual input stream. */ + private FSDataInputStream in; + + /** Cached error occurred during output stream open. */ + private IOException err; + + /** Flag indicating that the stream was already opened. */ + private boolean opened; + + /** + * Constructor. + * + * @param fs Secondary file system. + * @param path Path to the file to open. + * @param bufSize Buffer size. + */ + IgfsHadoopReader(FileSystem fs, Path path, int bufSize) { + assert fs != null; + assert path != null; + + this.fs = fs; + this.path = path; + this.bufSize = bufSize; + } + + /** Get input stream. */ + private PositionedReadable in() throws IOException { + if (opened) { + if (err != null) + throw err; + } + else { + opened = true; + + try { + in = fs.open(path, bufSize); + + if (in == null) + throw new IOException("Failed to open input stream (file system returned null): " + path); + } + catch (IOException e) { + err = e; + + throw err; + } + } + + return in; + } + + /** + * Close wrapped input stream in case it was previously opened. + */ + @Override public void close() { + U.closeQuiet(in); + } + + /** {@inheritDoc} */ + @Override public int read(long pos, byte[] buf, int off, int len) throws IOException { + return in().read(pos, buf, off, len); + } +}
