[
https://issues.apache.org/jira/browse/SPARK-6235?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15420554#comment-15420554
]
Guoqiang Li edited comment on SPARK-6235 at 8/15/16 1:53 AM:
-------------------------------------------------------------
[~hvanhovell]
The main changes.
1. Replace DiskStore method {{def getBytes (blockId: BlockId):
ChunkedByteBuffer}} to {{def getBlockData(blockId: BlockId): ManagedBuffer}}.
2. ManagedBuffer's nioByteBuffer method return ChunkedByteBuffer.
3. Add Class {{ChunkFetchInputStream}}, used for flow control and code as
follows:
{noformat}
package org.apache.spark.network.client;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ClosedChannelException;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.primitives.UnsignedBytes;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.buffer.ChunkedByteBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.network.util.TransportFrameDecoder;
public class ChunkFetchInputStream extends InputStream {
private final Logger logger =
LoggerFactory.getLogger(ChunkFetchInputStream.class);
private final TransportResponseHandler handler;
private final Channel channel;
private final StreamChunkId streamId;
private final long byteCount;
private final ChunkReceivedCallback callback;
private final LinkedBlockingQueue<ByteBuf> buffers = new
LinkedBlockingQueue<>(1024);
public final TransportFrameDecoder.Interceptor interceptor;
private ByteBuf curChunk;
private boolean isCallbacked = false;
private long writerIndex = 0;
private final AtomicReference<Throwable> cause = new AtomicReference<>(null);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
public ChunkFetchInputStream(
TransportResponseHandler handler,
Channel channel,
StreamChunkId streamId,
long byteCount,
ChunkReceivedCallback callback) {
this.handler = handler;
this.channel = channel;
this.streamId = streamId;
this.byteCount = byteCount;
this.callback = callback;
this.interceptor = new StreamInterceptor();
}
@Override
public int read() throws IOException {
if (isClosed.get()) return -1;
pullChunk();
if (curChunk != null) {
byte b = curChunk.readByte();
return UnsignedBytes.toInt(b);
} else {
return -1;
}
}
@Override
public int read(byte[] dest, int offset, int length) throws IOException {
if (isClosed.get()) return -1;
pullChunk();
if (curChunk != null) {
int amountToGet = Math.min(curChunk.readableBytes(), length);
curChunk.readBytes(dest, offset, amountToGet);
return amountToGet;
} else {
return -1;
}
}
@Override
public long skip(long bytes) throws IOException {
if (isClosed.get()) return 0L;
pullChunk();
if (curChunk != null) {
int amountToSkip = (int) Math.min(bytes, curChunk.readableBytes());
curChunk.skipBytes(amountToSkip);
return amountToSkip;
} else {
return 0L;
}
}
@Override
public void close() throws IOException {
if (!isClosed.get()) {
releaseCurChunk();
isClosed.set(true);
resetChannel();
Iterator<ByteBuf> itr = buffers.iterator();
while (itr.hasNext()) {
itr.next().release();
}
buffers.clear();
}
}
private void pullChunk() throws IOException {
if (curChunk != null && !curChunk.isReadable()) releaseCurChunk();
if (curChunk == null && cause.get() == null && !isClosed.get()) {
try {
curChunk = buffers.take();
// if channel.read() will be not invoked automatically,
// the method is called by here
if (!channel.config().isAutoRead()) channel.read();
} catch (Throwable e) {
setCause(e);
}
}
if (cause.get() != null) throw new IOException(cause.get());
}
private void setCause(Throwable e) {
if (cause.get() == null) cause.set(e);
}
private void releaseCurChunk() {
if (curChunk != null) {
curChunk.release();
curChunk = null;
}
}
private void onSuccess() throws IOException {
if (isCallbacked) return;
if (cause.get() != null) {
callback.onFailure(streamId.chunkIndex, cause.get());
} else {
InputStream inputStream = new LimitedInputStream(this, byteCount);
ManagedBuffer managedBuffer = new InputStreamManagedBuffer(inputStream,
byteCount);
callback.onSuccess(streamId.chunkIndex, managedBuffer);
}
isCallbacked = true;
}
private void resetChannel() {
if (!channel.config().isAutoRead()) {
channel.config().setAutoRead(true);
channel.read();
}
}
private class StreamInterceptor implements TransportFrameDecoder.Interceptor {
@Override
public void exceptionCaught(Throwable e) throws Exception {
handler.deactivateStream();
setCause(e);
logger.trace("exceptionCaught", e);
onSuccess();
resetChannel();
}
@Override
public void channelInactive() throws Exception {
handler.deactivateStream();
setCause(new ClosedChannelException());
logger.trace("channelInactive", cause.get());
onSuccess();
resetChannel();
}
@Override
public boolean handle(ByteBuf buf) throws Exception {
try {
ByteBuf frame = nextBufferForFrame(byteCount - writerIndex, buf);
int available = frame.readableBytes();
writerIndex += available;
mayTrafficSuspension();
if (!isClosed.get() && available > 0) {
buffers.put(frame);
if (writerIndex > byteCount) {
setCause(new IllegalStateException(String.format(
"Read too many bytes? Expected %d, but read %d.", byteCount,
writerIndex)));
handler.deactivateStream();
} else if (writerIndex == byteCount) {
handler.deactivateStream();
}
} else {
frame.release();
}
logger.trace(streamId + ", writerIndex " + writerIndex + " byteCount,
" + byteCount);
onSuccess();
} catch (Exception e) {
setCause(e);
resetChannel();
}
return writerIndex != byteCount;
}
/**
* Takes the first buffer in the internal list, and either adjust it to fit
in the frame
* (by taking a slice out of it) or remove it from the internal list.
*/
private ByteBuf nextBufferForFrame(long bytesToRead, ByteBuf buf) {
int slen = (int) Math.min(buf.readableBytes(), bytesToRead);
ByteBuf frame;
if (slen == buf.readableBytes()) {
frame = buf.retain().readSlice(slen);
} else {
frame = buf.alloc().buffer(slen);
buf.readBytes(frame);
frame.retain();
}
return frame;
}
private void mayTrafficSuspension() {
// If there is too much cached chunk, to manually call channel.read().
if (channel.config().isAutoRead() && buffers.size() > 31) {
channel.config().setAutoRead(false);
}
if (writerIndex >= byteCount) resetChannel();
}
}
private class InputStreamManagedBuffer extends ManagedBuffer {
private final InputStream inputStream;
private final long byteCount;
InputStreamManagedBuffer(InputStream inputStream, long byteCount) {
this.inputStream = inputStream;
this.byteCount = byteCount;
}
public long size() {
return byteCount;
}
public ChunkedByteBuffer nioByteBuffer() throws IOException {
throw new UnsupportedOperationException("nioByteBuffer");
}
public InputStream createInputStream() throws IOException {
return inputStream;
}
public ManagedBuffer retain() {
// throw new UnsupportedOperationException("retain");
return this;
}
public ManagedBuffer release() {
// throw new UnsupportedOperationException("release");
return this;
}
public Object convertToNetty() throws IOException {
throw new UnsupportedOperationException("convertToNetty");
}
}
}
{noformat}
was (Author: gq):
[~hvanhovell]
The main changes.
1. Replace DiskStore method {{def getBytes (blockId: BlockId):
ChunkedByteBuffer}} to {{def getBlockData(blockId: BlockId): ManagedBuffer}}.
2. ManagedBuffer's nioByteBuffer method return ChunkedByteBuffer.
3. Add Class Chunk Fetch InputStream, used for flow control and code as follows:
{noformat}
package org.apache.spark.network.client;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ClosedChannelException;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import com.google.common.primitives.UnsignedBytes;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.spark.network.buffer.ChunkedByteBuffer;
import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.protocol.StreamChunkId;
import org.apache.spark.network.util.LimitedInputStream;
import org.apache.spark.network.util.TransportFrameDecoder;
public class ChunkFetchInputStream extends InputStream {
private final Logger logger =
LoggerFactory.getLogger(ChunkFetchInputStream.class);
private final TransportResponseHandler handler;
private final Channel channel;
private final StreamChunkId streamId;
private final long byteCount;
private final ChunkReceivedCallback callback;
private final LinkedBlockingQueue<ByteBuf> buffers = new
LinkedBlockingQueue<>(1024);
public final TransportFrameDecoder.Interceptor interceptor;
private ByteBuf curChunk;
private boolean isCallbacked = false;
private long writerIndex = 0;
private final AtomicReference<Throwable> cause = new AtomicReference<>(null);
private final AtomicBoolean isClosed = new AtomicBoolean(false);
public ChunkFetchInputStream(
TransportResponseHandler handler,
Channel channel,
StreamChunkId streamId,
long byteCount,
ChunkReceivedCallback callback) {
this.handler = handler;
this.channel = channel;
this.streamId = streamId;
this.byteCount = byteCount;
this.callback = callback;
this.interceptor = new StreamInterceptor();
}
@Override
public int read() throws IOException {
if (isClosed.get()) return -1;
pullChunk();
if (curChunk != null) {
byte b = curChunk.readByte();
return UnsignedBytes.toInt(b);
} else {
return -1;
}
}
@Override
public int read(byte[] dest, int offset, int length) throws IOException {
if (isClosed.get()) return -1;
pullChunk();
if (curChunk != null) {
int amountToGet = Math.min(curChunk.readableBytes(), length);
curChunk.readBytes(dest, offset, amountToGet);
return amountToGet;
} else {
return -1;
}
}
@Override
public long skip(long bytes) throws IOException {
if (isClosed.get()) return 0L;
pullChunk();
if (curChunk != null) {
int amountToSkip = (int) Math.min(bytes, curChunk.readableBytes());
curChunk.skipBytes(amountToSkip);
return amountToSkip;
} else {
return 0L;
}
}
@Override
public void close() throws IOException {
if (!isClosed.get()) {
releaseCurChunk();
isClosed.set(true);
resetChannel();
Iterator<ByteBuf> itr = buffers.iterator();
while (itr.hasNext()) {
itr.next().release();
}
buffers.clear();
}
}
private void pullChunk() throws IOException {
if (curChunk != null && !curChunk.isReadable()) releaseCurChunk();
if (curChunk == null && cause.get() == null && !isClosed.get()) {
try {
curChunk = buffers.take();
// if channel.read() will be not invoked automatically,
// the method is called by here
if (!channel.config().isAutoRead()) channel.read();
} catch (Throwable e) {
setCause(e);
}
}
if (cause.get() != null) throw new IOException(cause.get());
}
private void setCause(Throwable e) {
if (cause.get() == null) cause.set(e);
}
private void releaseCurChunk() {
if (curChunk != null) {
curChunk.release();
curChunk = null;
}
}
private void onSuccess() throws IOException {
if (isCallbacked) return;
if (cause.get() != null) {
callback.onFailure(streamId.chunkIndex, cause.get());
} else {
InputStream inputStream = new LimitedInputStream(this, byteCount);
ManagedBuffer managedBuffer = new InputStreamManagedBuffer(inputStream,
byteCount);
callback.onSuccess(streamId.chunkIndex, managedBuffer);
}
isCallbacked = true;
}
private void resetChannel() {
if (!channel.config().isAutoRead()) {
channel.config().setAutoRead(true);
channel.read();
}
}
private class StreamInterceptor implements TransportFrameDecoder.Interceptor {
@Override
public void exceptionCaught(Throwable e) throws Exception {
handler.deactivateStream();
setCause(e);
logger.trace("exceptionCaught", e);
onSuccess();
resetChannel();
}
@Override
public void channelInactive() throws Exception {
handler.deactivateStream();
setCause(new ClosedChannelException());
logger.trace("channelInactive", cause.get());
onSuccess();
resetChannel();
}
@Override
public boolean handle(ByteBuf buf) throws Exception {
try {
ByteBuf frame = nextBufferForFrame(byteCount - writerIndex, buf);
int available = frame.readableBytes();
writerIndex += available;
mayTrafficSuspension();
if (!isClosed.get() && available > 0) {
buffers.put(frame);
if (writerIndex > byteCount) {
setCause(new IllegalStateException(String.format(
"Read too many bytes? Expected %d, but read %d.", byteCount,
writerIndex)));
handler.deactivateStream();
} else if (writerIndex == byteCount) {
handler.deactivateStream();
}
} else {
frame.release();
}
logger.trace(streamId + ", writerIndex " + writerIndex + " byteCount,
" + byteCount);
onSuccess();
} catch (Exception e) {
setCause(e);
resetChannel();
}
return writerIndex != byteCount;
}
/**
* Takes the first buffer in the internal list, and either adjust it to fit
in the frame
* (by taking a slice out of it) or remove it from the internal list.
*/
private ByteBuf nextBufferForFrame(long bytesToRead, ByteBuf buf) {
int slen = (int) Math.min(buf.readableBytes(), bytesToRead);
ByteBuf frame;
if (slen == buf.readableBytes()) {
frame = buf.retain().readSlice(slen);
} else {
frame = buf.alloc().buffer(slen);
buf.readBytes(frame);
frame.retain();
}
return frame;
}
private void mayTrafficSuspension() {
// If there is too much cached chunk, to manually call channel.read().
if (channel.config().isAutoRead() && buffers.size() > 31) {
channel.config().setAutoRead(false);
}
if (writerIndex >= byteCount) resetChannel();
}
}
private class InputStreamManagedBuffer extends ManagedBuffer {
private final InputStream inputStream;
private final long byteCount;
InputStreamManagedBuffer(InputStream inputStream, long byteCount) {
this.inputStream = inputStream;
this.byteCount = byteCount;
}
public long size() {
return byteCount;
}
public ChunkedByteBuffer nioByteBuffer() throws IOException {
throw new UnsupportedOperationException("nioByteBuffer");
}
public InputStream createInputStream() throws IOException {
return inputStream;
}
public ManagedBuffer retain() {
// throw new UnsupportedOperationException("retain");
return this;
}
public ManagedBuffer release() {
// throw new UnsupportedOperationException("release");
return this;
}
public Object convertToNetty() throws IOException {
throw new UnsupportedOperationException("convertToNetty");
}
}
}
{noformat}
> Address various 2G limits
> -------------------------
>
> Key: SPARK-6235
> URL: https://issues.apache.org/jira/browse/SPARK-6235
> Project: Spark
> Issue Type: Umbrella
> Components: Shuffle, Spark Core
> Reporter: Reynold Xin
>
> An umbrella ticket to track the various 2G limit we have in Spark, due to the
> use of byte arrays and ByteBuffers.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]