http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java index 1f79067..7de8651 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java @@ -18,15 +18,15 @@ package org.apache.flink.runtime.io.disk.iomanager; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.util.EnvironmentInformation; + import java.io.IOException; import java.lang.Thread.UncaughtExceptionHandler; import java.util.List; import java.util.concurrent.LinkedBlockingQueue; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.util.EnvironmentInformation; - -import com.google.common.base.Preconditions; +import static com.google.common.base.Preconditions.checkState; /** * A version of the {@link IOManager} that uses asynchronous I/O. @@ -181,13 +181,13 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException { - Preconditions.checkState(!shutdown, "I/O-Manger is closed."); + checkState(!shutdown, "I/O-Manger is closed."); return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue); } @Override - public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException { - Preconditions.checkState(!shutdown, "I/O-Manger is closed."); + public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException { + checkState(!shutdown, "I/O-Manger is closed."); return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback); } @@ -205,7 +205,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID, LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException { - Preconditions.checkState(!shutdown, "I/O-Manger is closed."); + checkState(!shutdown, "I/O-Manger is closed."); return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue); } @@ -228,7 +228,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID, List<MemorySegment> targetSegments, int numBlocks) throws IOException { - Preconditions.checkState(!shutdown, "I/O-Manger is closed."); + checkState(!shutdown, "I/O-Manger is closed."); return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks); }
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java index 1b7adae..69791f4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java @@ -33,7 +33,6 @@ interface IORequest { public void requestDone(IOException ioex); } - /** * Interface for I/O requests that are handled by the IOManager's reading thread. */ @@ -47,7 +46,6 @@ interface ReadRequest extends IORequest { public void read() throws IOException; } - /** * Interface for I/O requests that are handled by the IOManager's writing thread. */ http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java index 78699e2..95f3dc7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java @@ -26,7 +26,7 @@ import org.apache.flink.core.memory.MemorySegment; /** * A {@link RequestDoneCallback} that adds the memory segments to a blocking queue. */ -public class QueuingCallback implements RequestDoneCallback { +public class QueuingCallback implements RequestDoneCallback<MemorySegment> { private final LinkedBlockingQueue<MemorySegment> queue; http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java index 982343c..f9a0965 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java @@ -20,17 +20,15 @@ package org.apache.flink.runtime.io.disk.iomanager; import java.io.IOException; -import org.apache.flink.core.memory.MemorySegment; - /** * Callback to be executed on completion of an asynchronous I/O request. - * Depending on success or failure, either method of - * {@ink #requestSuccessful(MemorySegment)} or {@link #requestFailed(MemorySegment, IOException)} - * is called. + * <p> + * Depending on success or failure, either {@link #requestSuccessful(Object)} + * or {@link #requestSuccessful(Object)} is called. */ -public interface RequestDoneCallback { +public interface RequestDoneCallback<T> { + + void requestSuccessful(T request); - void requestSuccessful(MemorySegment buffer); - - void requestFailed(MemorySegment buffer, IOException e); + void requestFailed(T buffer, IOException e); } http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java deleted file mode 100644 index 60232da..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.flink.core.memory.MemorySegment; - -public class Buffer { - - private final MemorySegment memorySegment; - - private final BufferRecycler recycler; - - // ----------------------------------------------------------------------------------------------------------------- - - private final AtomicInteger referenceCounter; - - private int size; - - // ----------------------------------------------------------------------------------------------------------------- - - public Buffer(MemorySegment memorySegment, int size, BufferRecycler recycler) { - this.memorySegment = memorySegment; - this.size = size; - this.recycler = recycler; - - // we are the first, so we start with reference count of one - this.referenceCounter = new AtomicInteger(1); - } - - /** - * @param toDuplicate Buffer instance to duplicate - */ - private Buffer(Buffer toDuplicate) { - if (toDuplicate.referenceCounter.getAndIncrement() == 0) { - throw new IllegalStateException("Buffer was released before duplication."); - } - - this.memorySegment = toDuplicate.memorySegment; - this.size = toDuplicate.size; - this.recycler = toDuplicate.recycler; - this.referenceCounter = toDuplicate.referenceCounter; - } - - // ----------------------------------------------------------------------------------------------------------------- - - public MemorySegment getMemorySegment() { - return this.memorySegment; - } - - public int size() { - return this.size; - } - - public void limitSize(int size) { - if (size >= 0 && size <= this.memorySegment.size()) { - this.size = size; - } else { - throw new IllegalArgumentException(); - } - } - - public void recycleBuffer() { - int refCount = this.referenceCounter.decrementAndGet(); - if (refCount == 0) { - this.recycler.recycle(this.memorySegment); - } - } - - public Buffer duplicate() { - return new Buffer(this); - } - - public void copyToBuffer(Buffer destinationBuffer) { - if (size() > destinationBuffer.size()) { - throw new IllegalArgumentException("Destination buffer is too small to store content of source buffer."); - } - - this.memorySegment.copyTo(0, destinationBuffer.memorySegment, 0, size); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java deleted file mode 100644 index e48ca52..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network; - -import org.apache.flink.core.memory.MemorySegment; - -public interface BufferRecycler { - - /** - * Called by {@link org.apache.flink.runtime.io.network.Buffer} to return a {@link MemorySegment} to its original buffer pool. - * - * @param buffer the segment to be recycled - */ - void recycle(MemorySegment buffer); -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java deleted file mode 100644 index c841872..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java +++ /dev/null @@ -1,692 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network; - -import akka.actor.ActorRef; -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.runtime.AbstractID; -import org.apache.flink.runtime.akka.AkkaUtils; -import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.execution.RuntimeEnvironment; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.instance.InstanceConnectionInfo; -import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider; -import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker; -import org.apache.flink.runtime.io.network.bufferprovider.DiscardBufferPool; -import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool; -import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner; -import org.apache.flink.runtime.io.network.channels.Channel; -import org.apache.flink.runtime.io.network.channels.ChannelID; -import org.apache.flink.runtime.io.network.channels.ChannelType; -import org.apache.flink.runtime.io.network.channels.InputChannel; -import org.apache.flink.runtime.io.network.channels.OutputChannel; -import org.apache.flink.runtime.io.network.gates.GateID; -import org.apache.flink.runtime.io.network.gates.InputGate; -import org.apache.flink.runtime.io.network.gates.OutputGate; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.runtime.messages.JobManagerMessages; -import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.util.ExceptionUtils; -import scala.concurrent.duration.FiniteDuration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; - -/** - * The channel manager sets up the network buffers and dispatches data between channels. - */ -public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker { - - private static final Logger LOG = LoggerFactory.getLogger(ChannelManager.class); - - private final ActorRef channelLookup; - - private final InstanceConnectionInfo connectionInfo; - - private final Map<ChannelID, Channel> channels; - - private final Map<AbstractID, LocalBufferPoolOwner> localBuffersPools; - - private final Map<ChannelID, EnvelopeReceiverList> receiverCache; - - private final GlobalBufferPool globalBufferPool; - - private final NetworkConnectionManager networkConnectionManager; - - private final InetSocketAddress ourAddress; - - private final DiscardBufferPool discardBufferPool; - - private final FiniteDuration timeout; - - // ----------------------------------------------------------------------------------------------------------------- - - public ChannelManager(ActorRef channelLookup, InstanceConnectionInfo connectionInfo, int numNetworkBuffers, - int networkBufferSize, NetworkConnectionManager networkConnectionManager, - FiniteDuration timeout) throws IOException { - - this.channelLookup= channelLookup; - this.connectionInfo = connectionInfo; - - this.timeout = timeout; - - try { - this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize); - } catch (Throwable e) { - throw new IOException("Failed to instantiate GlobalBufferPool.", e); - } - - this.networkConnectionManager = networkConnectionManager; - networkConnectionManager.start(this); - - // management data structures - this.channels = new ConcurrentHashMap<ChannelID, Channel>(); - this.receiverCache = new ConcurrentHashMap<ChannelID, EnvelopeReceiverList>(); - this.localBuffersPools = new ConcurrentHashMap<AbstractID, LocalBufferPoolOwner>(); - - this.ourAddress = new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()); - - // a special pool if the data is to be discarded - this.discardBufferPool = new DiscardBufferPool(); - } - - public void shutdown() throws IOException { - this.networkConnectionManager.shutdown(); - - this.globalBufferPool.destroy(); - } - - public GlobalBufferPool getGlobalBufferPool() { - return globalBufferPool; - } - - public NetworkConnectionManager getNetworkConnectionManager() { - return networkConnectionManager; - } - - // ----------------------------------------------------------------------------------------------------------------- - // Task registration - // ----------------------------------------------------------------------------------------------------------------- - - /** - * Registers the given task with the channel manager. - * - * @param task the task to be registered - * @throws InsufficientResourcesException thrown if not enough buffers available to safely run this task - */ - public void register(Task task) throws InsufficientResourcesException { - // Check if we can safely run this task with the given buffers - ensureBufferAvailability(task); - - RuntimeEnvironment environment = task.getEnvironment(); - - // ------------------------------------------------------------------------------------------------------------- - // Register output channels - // ------------------------------------------------------------------------------------------------------------- - - environment.registerGlobalBufferPool(this.globalBufferPool); - - if (this.localBuffersPools.containsKey(task.getExecutionId())) { - throw new IllegalStateException("Execution " + task.getExecutionId() + " has a previous buffer pool owner"); - } - - for (OutputGate gate : environment.outputGates()) { - // add receiver list hints - for (OutputChannel channel : gate.channels()) { - // register envelope dispatcher with the channel - channel.registerEnvelopeDispatcher(this); - - switch (channel.getChannelType()) { - case IN_MEMORY: - addReceiverListHint(channel.getID(), channel.getConnectedId()); - break; - case NETWORK: - addReceiverListHint(channel.getConnectedId(), channel.getID()); - break; - } - - this.channels.put(channel.getID(), channel); - } - } - - this.localBuffersPools.put(task.getExecutionId(), environment); - - // ------------------------------------------------------------------------------------------------------------- - // Register input channels - // ------------------------------------------------------------------------------------------------------------- - - // register global - for (InputGate<?> gate : environment.inputGates()) { - gate.registerGlobalBufferPool(this.globalBufferPool); - - for (int i = 0; i < gate.getNumberOfInputChannels(); i++) { - InputChannel<? extends IOReadableWritable> channel = gate.getInputChannel(i); - channel.registerEnvelopeDispatcher(this); - - if (channel.getChannelType() == ChannelType.IN_MEMORY) { - addReceiverListHint(channel.getID(), channel.getConnectedId()); - } - - this.channels.put(channel.getID(), channel); - } - - this.localBuffersPools.put(gate.getGateID(), gate); - } - - // the number of channels per buffers has changed after unregistering the task - // => redistribute the number of designated buffers of the registered local buffer pools - redistributeBuffers(); - } - - /** - * Unregisters the given task from the channel manager. - * - * @param executionId the ID of the task to be unregistered - * @param task the task to be unregistered - */ - public void unregister(ExecutionAttemptID executionId, Task task) { - final Environment environment = task.getEnvironment(); - if (environment == null) { - return; - } - - // destroy and remove OUTPUT channels from registered channels and cache - for (ChannelID id : environment.getOutputChannelIDs()) { - Channel channel = this.channels.remove(id); - if (channel != null) { - - channel.destroy(); - - removeFromReceiverCacheAndMaybeCloseTcpConnection(channel); - } - } - - // destroy and remove INPUT channels from registered channels and cache - for (ChannelID id : environment.getInputChannelIDs()) { - Channel channel = this.channels.remove(id); - if (channel != null) { - channel.destroy(); - - removeFromReceiverCacheAndMaybeCloseTcpConnection(channel); - } - } - - // clear and remove INPUT side buffer pools - for (GateID id : environment.getInputGateIDs()) { - LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(id); - if (bufferPool != null) { - bufferPool.clearLocalBufferPool(); - } - } - - // clear and remove OUTPUT side buffer pool - LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(executionId); - if (bufferPool != null) { - bufferPool.clearLocalBufferPool(); - } - - // the number of channels per buffers has changed after unregistering the task - // => redistribute the number of designated buffers of the registered local buffer pools - redistributeBuffers(); - } - - private void removeFromReceiverCacheAndMaybeCloseTcpConnection(Channel channel) { - EnvelopeReceiverList receiver = this.receiverCache.remove(channel.getID()); - - if (receiver != null && receiver.hasRemoteReceiver()) { - networkConnectionManager.close(receiver.getRemoteReceiver()); - } - } - - /** - * Ensures that the channel manager has enough buffers to execute the given task. - * <p> - * If there is less than one buffer per channel available, an InsufficientResourcesException will be thrown, - * because of possible deadlocks. With more then one buffer per channel, deadlock-freedom is guaranteed. - * - * @param task task to be executed - * @throws InsufficientResourcesException thrown if not enough buffers available to execute the task - */ - private void ensureBufferAvailability(Task task) throws InsufficientResourcesException { - Environment env = task.getEnvironment(); - - int numBuffers = this.globalBufferPool.numBuffers(); - // existing channels + channels of the task - int numChannels = this.channels.size() + env.getNumberOfOutputChannels() + env.getNumberOfInputChannels(); - - // need at least one buffer per channel - if (numChannels > 0 && numBuffers / numChannels < 1) { - String msg = String.format("%s has not enough buffers to safely execute %s (%d buffers missing)", - this.connectionInfo.getFQDNHostname(), env.getTaskName(), numChannels - numBuffers); - - throw new InsufficientResourcesException(msg); - } - } - - /** - * Redistributes the buffers among the registered buffer pools. This method is called after each task registration - * and unregistration. - * <p> - * Every registered buffer pool gets buffers according to its number of channels weighted by the current buffer to - * channel ratio. - */ - private void redistributeBuffers() { - if (this.localBuffersPools.isEmpty() | this.channels.size() == 0) { - return; - } - - int numBuffers = this.globalBufferPool.numBuffers(); - int numChannels = this.channels.size(); - - double buffersPerChannel = numBuffers / (double) numChannels; - - if (buffersPerChannel < 1.0) { - throw new RuntimeException("System has not enough buffers to execute tasks."); - } - - // redistribute number of designated buffers per buffer pool - for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) { - int numDesignatedBuffers = (int) Math.ceil(buffersPerChannel * bufferPool.getNumberOfChannels()); - bufferPool.setDesignatedNumberOfBuffers(numDesignatedBuffers); - } - } - - // ----------------------------------------------------------------------------------------------------------------- - // Envelope processing - // ----------------------------------------------------------------------------------------------------------------- - - private void releaseEnvelope(Envelope envelope) { - Buffer buffer = envelope.getBuffer(); - if (buffer != null) { - buffer.recycleBuffer(); - } - } - - private void addReceiverListHint(ChannelID source, ChannelID localReceiver) { - EnvelopeReceiverList receiverList = new EnvelopeReceiverList(localReceiver); - - if (this.receiverCache.put(source, receiverList) != null) { - LOG.warn("Receiver cache already contained entry for " + source); - } - } - - private void addReceiverListHint(ChannelID source, RemoteReceiver remoteReceiver) { - EnvelopeReceiverList receiverList = new EnvelopeReceiverList(remoteReceiver); - - if (this.receiverCache.put(source, receiverList) != null) { - LOG.warn("Receiver cache already contained entry for " + source); - } - } - - private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) throws IOException { - Channel channel = this.channels.get(envelope.getSource()); - if (channel == null) { - LOG.error("Cannot find channel for channel ID " + envelope.getSource()); - return; - } - - // Only generate sender hints for output channels - if (channel.isInputChannel()) { - return; - } - - final ChannelID targetChannelID = channel.getConnectedId(); - final int connectionIndex = receiver.getConnectionIndex(); - - final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex); - final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress); - - this.networkConnectionManager.enqueue(senderHint, receiver, true); - } - - /** - * Returns the list of receivers for transfer envelopes produced by the channel with the given source channel ID. - * - * @param jobID - * the ID of the job the given channel ID belongs to - * @param sourceChannelID - * the source channel ID for which the receiver list shall be retrieved - * @return the list of receivers or <code>null</code> if the receiver could not be determined - * @throws IOException - */ - private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID sourceChannelID, boolean reportException) throws IOException { - EnvelopeReceiverList receiverList = this.receiverCache.get(sourceChannelID); - - if (receiverList != null) { - return receiverList; - } - - while (true) { - ConnectionInfoLookupResponse lookupResponse; - lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup, - new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID, - sourceChannelID), timeout).response(); - - if (lookupResponse.receiverReady()) { - receiverList = new EnvelopeReceiverList(lookupResponse); - break; - } - else if (lookupResponse.receiverNotReady()) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - if (reportException) { - throw new IOException("Lookup was interrupted."); - } else { - return null; - } - } - } - else if (lookupResponse.isJobAborting()) { - if (reportException) { - throw new CancelTaskException(); - } else { - return null; - } - } - else if (lookupResponse.receiverNotFound()) { - if (reportException) { - throw new IOException("Could not find the receiver for Job " + jobID + ", channel with source id " + sourceChannelID); - } else { - return null; - } - } - else { - throw new IllegalStateException("Unrecognized response to channel lookup."); - } - } - - this.receiverCache.put(sourceChannelID, receiverList); - - if (LOG.isDebugEnabled()) { - LOG.debug(String.format("Receiver for %s: %s [%s])", - sourceChannelID, - receiverList.hasLocalReceiver() ? receiverList.getLocalReceiver() : receiverList.getRemoteReceiver(), - receiverList.hasLocalReceiver() ? "local" : "remote")); - } - - return receiverList; - } - - /** - * Invalidates the entries identified by the given channel IDs from the receiver lookup cache. - * - * @param channelIDs channel IDs for entries to invalidate - */ - public void invalidateLookupCacheEntries(Set<ChannelID> channelIDs) { - for (ChannelID id : channelIDs) { - this.receiverCache.remove(id); - } - } - - // ----------------------------------------------------------------------------------------------------------------- - // EnvelopeDispatcher methods - // ----------------------------------------------------------------------------------------------------------------- - - @Override - public void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException { - EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, true); - - Buffer srcBuffer = envelope.getBuffer(); - Buffer destBuffer = null; - - boolean success = false; - - try { - if (receiverList.hasLocalReceiver()) { - ChannelID receiver = receiverList.getLocalReceiver(); - Channel channel = this.channels.get(receiver); - - if (channel == null) { - throw new LocalReceiverCancelledException(receiver); - } - - if (!channel.isInputChannel()) { - throw new IOException("Local receiver " + receiver + " is not an input channel."); - } - - InputChannel<?> inputChannel = (InputChannel<?>) channel; - - // copy the buffer into the memory space of the receiver - if (srcBuffer != null) { - try { - destBuffer = inputChannel.requestBufferBlocking(srcBuffer.size()); - } catch (InterruptedException e) { - throw new IOException(e.getMessage()); - } - - srcBuffer.copyToBuffer(destBuffer); - envelope.setBuffer(destBuffer); - srcBuffer.recycleBuffer(); - } - - inputChannel.queueEnvelope(envelope); - success = true; - } - else if (receiverList.hasRemoteReceiver()) { - RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver(); - - // Generate sender hint before sending the first envelope over the network - if (envelope.getSequenceNumber() == 0) { - generateSenderHint(envelope, remoteReceiver); - } - - this.networkConnectionManager.enqueue(envelope, remoteReceiver, false); - success = true; - } - } finally { - if (!success) { - if (srcBuffer != null) { - srcBuffer.recycleBuffer(); - } - if (destBuffer != null) { - destBuffer.recycleBuffer(); - } - } - } - } - - @Override - public void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException { - // this method sends only events back from input channels to output channels - // sanity check that we have no buffer - if (envelope.getBuffer() != null) { - throw new RuntimeException("Error: This method can only process envelopes without buffers."); - } - - EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, true); - - if (receiverList.hasLocalReceiver()) { - ChannelID receiver = receiverList.getLocalReceiver(); - Channel channel = this.channels.get(receiver); - - if (channel == null) { - throw new LocalReceiverCancelledException(receiver); - } - - if (channel.isInputChannel()) { - throw new IOException("Local receiver " + receiver + " of backward event is not an output channel."); - } - - OutputChannel outputChannel = (OutputChannel) channel; - outputChannel.queueEnvelope(envelope); - } - else if (receiverList.hasRemoteReceiver()) { - RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver(); - - // Generate sender hint before sending the first envelope over the network - this.networkConnectionManager.enqueue(envelope, remoteReceiver, envelope.getSequenceNumber() == 0); - } - } - - /** - * - */ - @Override - public void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException { - // ======================================================================================== - // IMPORTANT - // - // This method is called by the network I/O thread that reads the incoming TCP - // connections. This method must have minimal overhead and not throw exception if - // something is wrong with a job or individual transmission, but only when something - // is fundamentally broken in the system. - // ======================================================================================== - - // the sender hint event is to let the receiver know where exactly the envelope came from. - // the receiver will cache the sender id and its connection info in its local lookup table - // that allows the receiver to send envelopes to the sender without first pinging the job manager - // for the sender's connection info - - // Check if the envelope is the special envelope with the sender hint event - if (SenderHintEvent.isSenderHintEvent(envelope)) { - // Check if this is the final destination of the sender hint event before adding it - final SenderHintEvent seh = (SenderHintEvent) envelope.deserializeEvents().get(0); - if (this.channels.get(seh.getSource()) != null) { - addReceiverListHint(seh.getSource(), seh.getRemoteReceiver()); - return; - } - } - - // try and get the receiver list. if we cannot get it anymore, the task has been cleared - // the code frees the envelope on exception, so we need not to anything - EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, false); - if (receiverList == null) { - // receiver is cancelled and cleaned away - releaseEnvelope(envelope); - if (LOG.isDebugEnabled()) { - LOG.debug("Dropping envelope for cleaned up receiver."); - } - - return; - } - - if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) { - throw new IOException("Bug in network stack: Envelope dispatched from the incoming network pipe has no local receiver or has a remote receiver"); - } - - ChannelID localReceiver = receiverList.getLocalReceiver(); - Channel channel = this.channels.get(localReceiver); - - // if the channel is null, it means that receiver has been cleared already (cancelled or failed). - // release the buffer immediately - if (channel == null) { - releaseEnvelope(envelope); - if (LOG.isDebugEnabled()) { - LOG.debug("Dropping envelope for cancelled receiver " + localReceiver); - } - } - else { - channel.queueEnvelope(envelope); - } - } - - /** - * - * Upon an exception, this method frees the envelope. - */ - private final EnvelopeReceiverList getReceiverListForEnvelope(Envelope envelope, boolean reportException) throws IOException { - try { - return getReceiverList(envelope.getJobID(), envelope.getSource(), reportException); - } catch (IOException e) { - releaseEnvelope(envelope); - throw e; - } catch (CancelTaskException e) { - releaseEnvelope(envelope); - throw e; - } catch (Throwable t) { - releaseEnvelope(envelope); - ExceptionUtils.rethrow(t, "Error while requesting receiver list."); - return null; // silence the compiler - } - } - - // ----------------------------------------------------------------------------------------------------------------- - // BufferProviderBroker methods - // ----------------------------------------------------------------------------------------------------------------- - - @Override - public BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) throws IOException { - EnvelopeReceiverList receiverList = getReceiverList(jobID, sourceChannelID, false); - - // check if the receiver is already gone - if (receiverList == null) { - return this.discardBufferPool; - } - - if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) { - throw new IOException("The destination to be looked up is not a single local endpoint."); - } - - - ChannelID localReceiver = receiverList.getLocalReceiver(); - Channel channel = this.channels.get(localReceiver); - - if (channel == null) { - // receiver is already canceled - return this.discardBufferPool; - } - - if (!channel.isInputChannel()) { - throw new IOException("Channel context for local receiver " + localReceiver + " is not an input channel context"); - } - - return (InputChannel<?>) channel; - } - - // ----------------------------------------------------------------------------------------------------------------- - - public void logBufferUtilization() { - System.out.println("Buffer utilization at " + System.currentTimeMillis()); - - System.out.println("\tUnused global buffers: " + this.globalBufferPool.numAvailableBuffers()); - - System.out.println("\tLocal buffer pool status:"); - - for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) { - bufferPool.logBufferUtilization(); - } - - System.out.println("\tIncoming connections:"); - - for (Channel channel : this.channels.values()) { - if (channel.isInputChannel()) { - ((InputChannel<?>) channel).logQueuedEnvelopes(); - } - } - } - - public void verifyAllCachesEmpty() { - if (!channels.isEmpty()) { - throw new IllegalStateException("Channel manager caches not empty: There are still registered channels."); - } - if (!localBuffersPools.isEmpty()) { - throw new IllegalStateException("Channel manager caches not empty: There are still local buffer pools."); - } - if (!receiverCache.isEmpty()) { - throw new IllegalStateException("Channel manager caches not empty: There are still entries in the receiver cache."); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java deleted file mode 100644 index 51a0f94..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network; - -import java.io.Serializable; - -import org.apache.flink.runtime.io.network.channels.ChannelID; - -public class ConnectionInfoLookupResponse implements Serializable { - - private static final long serialVersionUID = 3961171754642077522L; - - - private enum ReturnCode { - NOT_FOUND, FOUND_AND_RECEIVER_READY, FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING - }; - - // was request successful? - private ReturnCode returnCode; - - private RemoteReceiver remoteTarget; - - private ChannelID localTarget; - - public ConnectionInfoLookupResponse() {} - - public ConnectionInfoLookupResponse(ReturnCode code) { - this.returnCode = code; - this.remoteTarget = null; - this.localTarget = null; - } - - public ConnectionInfoLookupResponse(ReturnCode code, ChannelID localTarget) { - this.returnCode = code; - this.remoteTarget = null; - this.localTarget = localTarget; - } - - public ConnectionInfoLookupResponse(ReturnCode code, RemoteReceiver receiver) { - this.returnCode = code; - this.remoteTarget = receiver; - this.localTarget = null; - } - - public RemoteReceiver getRemoteTarget() { - return this.remoteTarget; - } - - public ChannelID getLocalTarget() { - return this.localTarget; - } - - public boolean receiverNotFound() { - return (this.returnCode == ReturnCode.NOT_FOUND); - } - - public boolean receiverNotReady() { - return (this.returnCode == ReturnCode.FOUND_BUT_RECEIVER_NOT_READY); - } - - public boolean receiverReady() { - return (this.returnCode == ReturnCode.FOUND_AND_RECEIVER_READY); - } - - public boolean isJobAborting() { - return (this.returnCode == ReturnCode.JOB_IS_ABORTING); - } - - - public static ConnectionInfoLookupResponse createReceiverFoundAndReady(ChannelID targetChannelID) { - return new ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, targetChannelID); - } - - public static ConnectionInfoLookupResponse createReceiverFoundAndReady(RemoteReceiver remoteReceiver) { - return new ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, remoteReceiver); - } - - public static ConnectionInfoLookupResponse createReceiverNotFound() { - return new ConnectionInfoLookupResponse(ReturnCode.NOT_FOUND); - } - - public static ConnectionInfoLookupResponse createReceiverNotReady() { - return new ConnectionInfoLookupResponse(ReturnCode.FOUND_BUT_RECEIVER_NOT_READY); - } - - public static ConnectionInfoLookupResponse createJobIsAborting() { - return new ConnectionInfoLookupResponse(ReturnCode.JOB_IS_ABORTING); - } - - - @Override - public String toString() { - return this.returnCode.name() + ", local target: " + this.localTarget + ", remoteTarget: " + this.remoteTarget; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java new file mode 100644 index 0000000..4a5536b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; +import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider; + +import java.io.IOException; + +/** + * The connection manager manages physical connections for the (logical) remote + * input channels at runtime. + */ +public interface ConnectionManager { + + void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException; + + /** + * Creates a {@link PartitionRequestClient} instance for the given {@link RemoteAddress}. + */ + PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException; + + int getNumberOfActiveConnections(); + + void shutdown() throws IOException; +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java deleted file mode 100644 index 53f403c..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network; - -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; - -import org.apache.flink.runtime.event.task.AbstractEvent; -import org.apache.flink.runtime.io.network.channels.ChannelID; -import org.apache.flink.runtime.io.network.serialization.DataInputDeserializer; -import org.apache.flink.runtime.io.network.serialization.DataOutputSerializer; -import org.apache.flink.runtime.jobgraph.JobID; -import org.apache.flink.util.InstantiationUtil; - -public final class Envelope { - - private final JobID jobID; - - private final ChannelID source; - - private final int sequenceNumber; - - private ByteBuffer serializedEventList; - - private Buffer buffer; - - public Envelope(int sequenceNumber, JobID jobID, ChannelID source) { - this.sequenceNumber = sequenceNumber; - this.jobID = jobID; - this.source = source; - } - - private Envelope(Envelope toDuplicate) { - this.jobID = toDuplicate.jobID; - this.source = toDuplicate.source; - this.sequenceNumber = toDuplicate.sequenceNumber; - this.serializedEventList = null; - this.buffer = null; - } - - public Envelope duplicate() { - Envelope duplicate = new Envelope(this); - if (hasBuffer()) { - duplicate.setBuffer(this.buffer.duplicate()); - } - - return duplicate; - } - - public Envelope duplicateWithoutBuffer() { - return new Envelope(this); - } - - public JobID getJobID() { - return this.jobID; - } - - public ChannelID getSource() { - return this.source; - } - - public int getSequenceNumber() { - return this.sequenceNumber; - } - - public void setEventsSerialized(ByteBuffer serializedEventList) { - if (this.serializedEventList != null) { - throw new IllegalStateException("Event list has already been set."); - } - - this.serializedEventList = serializedEventList; - } - - public void serializeEventList(List<? extends AbstractEvent> eventList) { - if (this.serializedEventList != null) { - throw new IllegalStateException("Event list has already been set."); - } - - this.serializedEventList = serializeEvents(eventList); - } - - public ByteBuffer getEventsSerialized() { - return this.serializedEventList; - } - - public List<? extends AbstractEvent> deserializeEvents() { - return deserializeEvents(getClass().getClassLoader()); - } - - public List<? extends AbstractEvent> deserializeEvents(ClassLoader classloader) { - if (this.serializedEventList == null) { - return Collections.emptyList(); - } - - try { - DataInputDeserializer deserializer = new DataInputDeserializer(this.serializedEventList); - - int numEvents = deserializer.readInt(); - ArrayList<AbstractEvent> events = new ArrayList<AbstractEvent>(numEvents); - - for (int i = 0; i < numEvents; i++) { - String className = deserializer.readUTF(); - Class<? extends AbstractEvent> clazz; - try { - clazz = Class.forName(className).asSubclass(AbstractEvent.class); - } catch (ClassNotFoundException e) { - throw new RuntimeException("Could not load event class '" + className + "'.", e); - } catch (ClassCastException e) { - throw new RuntimeException("The class '" + className + "' is no valid subclass of '" + AbstractEvent.class.getName() + "'.", e); - } - - AbstractEvent evt = InstantiationUtil.instantiate(clazz, AbstractEvent.class); - evt.read(deserializer); - - events.add(evt); - } - - return events; - } - catch (IOException e) { - throw new RuntimeException("Error while deserializing the events.", e); - } - } - - public void setBuffer(Buffer buffer) { - this.buffer = buffer; - } - - public Buffer getBuffer() { - return this.buffer; - } - - private ByteBuffer serializeEvents(List<? extends AbstractEvent> events) { - try { - // create the serialized event list - DataOutputSerializer serializer = events.size() == 0 - ? new DataOutputSerializer(4) - : new DataOutputSerializer(events.size() * 32); - serializer.writeInt(events.size()); - - for (AbstractEvent evt : events) { - serializer.writeUTF(evt.getClass().getName()); - evt.write(serializer); - } - - return serializer.wrapAsByteBuffer(); - } - catch (IOException e) { - throw new RuntimeException("Error while serializing the task events.", e); - } - } - - public boolean hasBuffer() { - return this.buffer != null; - } - - @Override - public String toString() { - return String.format("Envelope %d [source id: %s, buffer size: %d, events size: %d]", - this.sequenceNumber, this.getSource(), this.buffer == null ? -1 : this.buffer.size(), - this.serializedEventList == null ? -1 : this.serializedEventList.remaining()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java deleted file mode 100644 index 59cf491..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network; - -import java.io.IOException; - -/** - * A envelope dispatcher receives {@link Envelope}s and sends them to all of its destinations. - */ -public interface EnvelopeDispatcher { - - /** - * Dispatches an envelope from an output channel to the receiving input channels (forward flow). - * - * @param envelope envelope to be sent - */ - void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException; - - /** - * Dispatches an envelope from an input channel to the receiving output channels (backwards flow). - * - * @param envelope envelope to be sent - */ - void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException; - - /** - * Dispatches an envelope from an incoming TCP connection. - * <p> - * After an envelope has been constructed from a TCP socket, this method is called to send the envelope to the - * receiving input channel. - * - * @param envelope envelope to be sent - */ - void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException; -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java deleted file mode 100644 index f6e4982..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network; - -import java.net.InetAddress; - -import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse; -import org.apache.flink.runtime.io.network.RemoteReceiver; -import org.apache.flink.runtime.io.network.channels.ChannelID; - -/** - * A transfer envelope receiver list contains all recipients of a transfer envelope. Their are three different types of - * receivers: Local receivers identified by {@link ChannelID} objects, remote receivers identified by - * {@link InetAddress} objects and finally checkpoints which are identified by - * <p> - * This class is thread-safe. - * - */ -public class EnvelopeReceiverList { - - private final ChannelID localReceiver; - - private final RemoteReceiver remoteReceiver; - - public EnvelopeReceiverList(ConnectionInfoLookupResponse cilr) { - this.localReceiver = cilr.getLocalTarget(); - this.remoteReceiver = cilr.getRemoteTarget(); - } - - public EnvelopeReceiverList(ChannelID localReceiver) { - this.localReceiver = localReceiver; - this.remoteReceiver = null; - } - - public EnvelopeReceiverList(RemoteReceiver remoteReceiver) { - this.localReceiver = null; - this.remoteReceiver = remoteReceiver; - } - - public boolean hasLocalReceiver() { - return this.localReceiver != null; - } - - public boolean hasRemoteReceiver() { - return this.remoteReceiver != null; - } - - public int getTotalNumberOfReceivers() { - return (this.localReceiver == null ? 0 : 1) + (this.remoteReceiver == null ? 0 : 1); - } - - public RemoteReceiver getRemoteReceiver() { - return this.remoteReceiver; - } - - public ChannelID getLocalReceiver() { - return this.localReceiver; - } - - @Override - public String toString() { - return "local receiver: " + this.localReceiver + ", remote receiver: " + this.remoteReceiver; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java deleted file mode 100644 index df4f4ec..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network; - -/** - * This exception is thrown by the {@link ChannelManager} to indicate that a task cannot be accepted because - * there are not enough resources available to safely execute it. - * - */ -public final class InsufficientResourcesException extends Exception { - - /** - * The generated serial version UID. - */ - private static final long serialVersionUID = -8977049569413215169L; - - /** - * Constructs a new insufficient resources exception. - * - * @param msg - * the message describing the exception - */ - InsufficientResourcesException(final String msg) { - super(msg); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java index 264fde6..894db35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java @@ -18,21 +18,24 @@ package org.apache.flink.runtime.io.network; -import java.io.IOException; +import org.apache.flink.runtime.io.network.netty.PartitionRequestClient; +import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider; -public class LocalConnectionManager implements NetworkConnectionManager { +import java.io.IOException; - @Override - public void start(ChannelManager channelManager) throws IOException { - } +/** + * A connection manager implementation to bypass setup overhead for task managers running in local + * execution mode. + */ +public class LocalConnectionManager implements ConnectionManager { @Override - public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean isFirstEnvelope) throws IOException { + public void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException { } @Override - public void close(RemoteReceiver receiver) { - + public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException { + return null; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java deleted file mode 100644 index 84d2d80..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network; - -import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.runtime.io.network.channels.ChannelID; - - -/** - * - */ -public class LocalReceiverCancelledException extends CancelTaskException { - private static final long serialVersionUID = 1L; - - private final ChannelID receiver; - - public LocalReceiverCancelledException(ChannelID receiver) { - this.receiver = receiver; - } - - - public ChannelID getReceiver() { - return receiver; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java deleted file mode 100644 index 309c92d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network; - -import java.io.IOException; - -public interface NetworkConnectionManager { - - public void start(ChannelManager channelManager) throws IOException; - - public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean isFirstEnvelope) throws IOException; - - public void close(RemoteReceiver receiver); - - public int getNumberOfActiveConnections(); - - public void shutdown() throws IOException; -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java new file mode 100644 index 0000000..aa6c64c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import akka.actor.ActorRef; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.api.reader.BufferReader; +import org.apache.flink.runtime.io.network.api.writer.BufferWriter; +import org.apache.flink.runtime.io.network.buffer.BufferPool; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.netty.NettyConfig; +import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; +import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition; +import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager; +import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; +import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; +import scala.concurrent.duration.FiniteDuration; + +import java.io.IOException; + +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * Network I/O components of each {@link TaskManager} instance. + */ +public class NetworkEnvironment { + + private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class); + + private final ActorRef jobManager; + + private final FiniteDuration jobManagerTimeout; + + private final IntermediateResultPartitionManager partitionManager; + + private final TaskEventDispatcher taskEventDispatcher; + + private final NetworkBufferPool networkBufferPool; + + private final ConnectionManager connectionManager; + + private boolean isShutdown; + + /** + * Initializes all network I/O components. + */ + public NetworkEnvironment(ActorRef jobManager, FiniteDuration jobManagerTimeout, NetworkEnvironmentConfiguration config) throws IOException { + this.jobManager = checkNotNull(jobManager); + this.jobManagerTimeout = checkNotNull(jobManagerTimeout); + + this.partitionManager = new IntermediateResultPartitionManager(); + this.taskEventDispatcher = new TaskEventDispatcher(); + + // -------------------------------------------------------------------- + // Network buffers + // -------------------------------------------------------------------- + try { + networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize()); + } + catch (Throwable t) { + throw new IOException("Failed to instantiate network buffer pool: " + t.getMessage(), t); + } + + // -------------------------------------------------------------------- + // Network connections + // -------------------------------------------------------------------- + final Option<NettyConfig> nettyConfig = config.nettyConfig(); + + connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get()) : new LocalConnectionManager(); + + try { + connectionManager.start(partitionManager, taskEventDispatcher); + } + catch (Throwable t) { + throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t); + } + } + + public ActorRef getJobManager() { + return jobManager; + } + + public FiniteDuration getJobManagerTimeout() { + return jobManagerTimeout; + } + + public void registerTask(Task task) throws IOException { + final ExecutionAttemptID executionId = task.getExecutionId(); + + final IntermediateResultPartition[] producedPartitions = task.getProducedPartitions(); + final BufferWriter[] writers = task.getWriters(); + + if (writers.length != producedPartitions.length) { + throw new IllegalStateException("Unequal number of writers and partitions."); + } + + for (int i = 0; i < producedPartitions.length; i++) { + final IntermediateResultPartition partition = producedPartitions[i]; + final BufferWriter writer = writers[i]; + + // Buffer pool for the partition + BufferPool bufferPool = null; + + try { + bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfQueues(), false); + partition.setBufferPool(bufferPool); + partitionManager.registerIntermediateResultPartition(partition); + } + catch (Throwable t) { + if (bufferPool != null) { + bufferPool.destroy(); + } + + if (t instanceof IOException) { + throw (IOException) t; + } + else { + throw new IOException(t.getMessage(), t); + } + } + + // Register writer with task event dispatcher + taskEventDispatcher.registerWriterForIncomingTaskEvents(executionId, writer.getPartitionId(), writer); + } + + // Setup the buffer pool for each buffer reader + final BufferReader[] readers = task.getReaders(); + + for (BufferReader reader : readers) { + BufferPool bufferPool = null; + + try { + bufferPool = networkBufferPool.createBufferPool(reader.getNumberOfInputChannels(), false); + reader.setBufferPool(bufferPool); + } + catch (Throwable t) { + if (bufferPool != null) { + bufferPool.destroy(); + } + + if (t instanceof IOException) { + throw (IOException) t; + } + else { + throw new IOException(t.getMessage(), t); + } + } + } + } + + public void unregisterTask(Task task) { + if (LOG.isDebugEnabled()) { + LOG.debug("Unregistering task {} ({}) from network environment (state: {}).", task.getTaskNameWithSubtasks(), task.getExecutionId(), task.getExecutionState()); + } + + final ExecutionAttemptID executionId = task.getExecutionId(); + + if (task.isCanceledOrFailed()) { + partitionManager.failIntermediateResultPartitions(executionId); + } + + taskEventDispatcher.unregisterWriters(executionId); + + final BufferReader[] readers = task.getReaders(); + + if (readers != null) { + for (BufferReader reader : readers) { + try { + if (reader != null) { + reader.releaseAllResources(); + } + } + catch (IOException e) { + LOG.error("Error during release of reader resources: " + e.getMessage(), e); + } + } + } + } + + public IntermediateResultPartitionManager getPartitionManager() { + return partitionManager; + } + + public TaskEventDispatcher getTaskEventDispatcher() { + return taskEventDispatcher; + } + + public ConnectionManager getConnectionManager() { + return connectionManager; + } + + public NetworkBufferPool getNetworkBufferPool() { + return networkBufferPool; + } + + public boolean hasReleasedAllResources() { + String msg = String.format("Network buffer pool: %d missing memory segments. %d registered buffer pools. Connection manager: %d active connections. Task event dispatcher: %d registered writers.", + networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(), networkBufferPool.getNumberOfRegisteredBufferPools(), connectionManager.getNumberOfActiveConnections(), taskEventDispatcher.getNumberOfRegisteredWriters()); + + boolean success = networkBufferPool.getTotalNumberOfMemorySegments() == networkBufferPool.getNumberOfAvailableMemorySegments() && + networkBufferPool.getNumberOfRegisteredBufferPools() == 0 && + connectionManager.getNumberOfActiveConnections() == 0 && + taskEventDispatcher.getNumberOfRegisteredWriters() == 0; + + if (success) { + String successMsg = "Network environment did release all resources: " + msg; + LOG.debug(successMsg); + } + else { + String errMsg = "Network environment did *not* release all resources: " + msg; + + LOG.error(errMsg); + } + + return success; + } + + /** + * Tries to shut down all network I/O components. + */ + public void shutdown() { + if (!isShutdown) { + try { + if (networkBufferPool != null) { + networkBufferPool.destroy(); + } + } + catch (Throwable t) { + LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t); + } + + if (partitionManager != null) { + try { + partitionManager.shutdown(); + } + catch (Throwable t) { + LOG.warn("Partition manager did not shut down properly: " + t.getMessage(), t); + } + } + + try { + if (connectionManager != null) { + connectionManager.shutdown(); + } + } + catch (Throwable t) { + LOG.warn("Network connection manager did not shut down properly: " + t.getMessage(), t); + } + + isShutdown = true; + } + } + + public boolean isShutdown() { + return isShutdown; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java new file mode 100644 index 0000000..937055b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network; + +import java.io.IOException; +import java.io.Serializable; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import org.apache.flink.core.io.IOReadableWritable; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.runtime.executiongraph.IntermediateResult; +import org.apache.flink.runtime.instance.InstanceConnectionInfo; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; + +/** + * A {@link RemoteAddress} identifies a connection to a remote task manager by + * the socket address and a connection index. This allows multiple connections + * to be distinguished by their connection index. + * <p> + * The connection index is assigned by the {@link IntermediateResult} and + * ensures that it is safe to multiplex multiple data transfers over the same + * physical TCP connection. + */ +public class RemoteAddress implements IOReadableWritable, Serializable { + + private InetSocketAddress address; + + private int connectionIndex; + + public RemoteAddress(InstanceConnectionInfo connectionInfo, int connectionIndex) { + this(new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex); + } + + public RemoteAddress(InetSocketAddress address, int connectionIndex) { + this.address = checkNotNull(address); + checkArgument(connectionIndex >= 0); + this.connectionIndex = connectionIndex; + } + + public InetSocketAddress getAddress() { + return address; + } + + public int getConnectionIndex() { + return connectionIndex; + } + + @Override + public int hashCode() { + return address.hashCode() + (31 * connectionIndex); + } + + @Override + public boolean equals(Object other) { + if (other.getClass() != RemoteAddress.class) { + return false; + } + + final RemoteAddress ra = (RemoteAddress) other; + if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) { + return false; + } + + return true; + } + + @Override + public String toString() { + return address + " [" + connectionIndex + "]"; + } + + // ------------------------------------------------------------------------ + // Serialization + // ------------------------------------------------------------------------ + + public RemoteAddress() { + this.address = null; + this.connectionIndex = -1; + } + + @Override + public void write(final DataOutputView out) throws IOException { + final InetAddress ia = address.getAddress(); + out.writeInt(ia.getAddress().length); + out.write(ia.getAddress()); + out.writeInt(address.getPort()); + + out.writeInt(connectionIndex); + } + + @Override + public void read(final DataInputView in) throws IOException { + final byte[] addressBytes = new byte[in.readInt()]; + in.readFully(addressBytes); + + final InetAddress ia = InetAddress.getByAddress(addressBytes); + int port = in.readInt(); + + address = new InetSocketAddress(ia, port); + connectionIndex = in.readInt(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java deleted file mode 100644 index 436d07d..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.io.network; - -import java.io.IOException; -import java.io.Serializable; -import java.net.InetAddress; -import java.net.InetSocketAddress; - -import org.apache.flink.core.io.IOReadableWritable; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; - -/** - * Objects of this class uniquely identify a connection to a remote {@link org.apache.flink.runtime.taskmanager.TaskManager}. - */ -public final class RemoteReceiver implements IOReadableWritable, Serializable { - - private static final long serialVersionUID = 4304924747853162443L; - - /** - * The address of the connection to the remote TaskManager. - */ - private InetSocketAddress connectionAddress; - - /** - * The index of the connection to the remote TaskManager. - */ - private int connectionIndex; - - /** - * Constructs a new remote receiver object. - * - * @param connectionAddress - * the address of the connection to the remote TaskManager - * @param connectionIndex - * the index of the connection to the remote TaskManager - */ - public RemoteReceiver(final InetSocketAddress connectionAddress, final int connectionIndex) { - if (connectionAddress == null) { - throw new IllegalArgumentException("Argument connectionAddress must not be null"); - } - if (connectionIndex < 0) { - throw new IllegalArgumentException("Argument connectionIndex must be a non-negative integer number"); - } - - this.connectionAddress = connectionAddress; - this.connectionIndex = connectionIndex; - } - - /** - * Default constructor for serialization/deserialization. - */ - public RemoteReceiver() { - this.connectionAddress = null; - this.connectionIndex = -1; - } - - /** - * Returns the address of the connection to the remote TaskManager. - * - * @return the address of the connection to the remote TaskManager - */ - public InetSocketAddress getConnectionAddress() { - return this.connectionAddress; - } - - /** - * Returns the index of the connection to the remote TaskManager. - * - * @return the index of the connection to the remote TaskManager - */ - public int getConnectionIndex() { - return this.connectionIndex; - } - - - @Override - public int hashCode() { - return this.connectionAddress.hashCode() + (31 * this.connectionIndex); - } - - - @Override - public boolean equals(final Object obj) { - - if (!(obj instanceof RemoteReceiver)) { - return false; - } - - final RemoteReceiver rr = (RemoteReceiver) obj; - if (!this.connectionAddress.equals(rr.connectionAddress)) { - return false; - } - - if (this.connectionIndex != rr.connectionIndex) { - return false; - } - - return true; - } - - - @Override - public void write(final DataOutputView out) throws IOException { - - final InetAddress ia = this.connectionAddress.getAddress(); - out.writeInt(ia.getAddress().length); - out.write(ia.getAddress()); - out.writeInt(this.connectionAddress.getPort()); - - out.writeInt(this.connectionIndex); - } - - - @Override - public void read(final DataInputView in) throws IOException { - final int addr_length = in.readInt(); - final byte[] address = new byte[addr_length]; - in.readFully(address); - - InetAddress ia = InetAddress.getByAddress(address); - int port = in.readInt(); - this.connectionAddress = new InetSocketAddress(ia, port); - - this.connectionIndex = in.readInt(); - } - - - @Override - public String toString() { - return this.connectionAddress + " (" + this.connectionIndex + ")"; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java deleted file mode 100644 index 7cdbabc..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.runtime.io.network; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.runtime.event.task.AbstractEvent; -import org.apache.flink.runtime.io.network.channels.ChannelID; - -public final class SenderHintEvent extends AbstractEvent { - - /** - * The sequence number that will be set for transfer envelopes which contain the sender hint event. - */ - private static final int SENDER_HINT_SEQUENCE_NUMBER = 0; - - private final ChannelID source; - - private final RemoteReceiver remoteReceiver; - - SenderHintEvent(final ChannelID source, final RemoteReceiver remoteReceiver) { - - if (source == null) { - throw new IllegalArgumentException("Argument source must not be null"); - } - - if (remoteReceiver == null) { - throw new IllegalArgumentException("Argument remoteReceiver must not be null"); - } - - this.source = source; - this.remoteReceiver = remoteReceiver; - } - - public SenderHintEvent() { - - this.source = new ChannelID(); - this.remoteReceiver = new RemoteReceiver(); - } - - public ChannelID getSource() { - - return this.source; - } - - public RemoteReceiver getRemoteReceiver() { - - return this.remoteReceiver; - } - - - @Override - public void write(final DataOutputView out) throws IOException { - - this.source.write(out); - this.remoteReceiver.write(out); - } - - - @Override - public void read(final DataInputView in) throws IOException { - - this.source.read(in); - this.remoteReceiver.read(in); - } - - public static Envelope createEnvelopeWithEvent(final Envelope originalEnvelope, - final ChannelID source, final RemoteReceiver remoteReceiver) { - - final Envelope envelope = new Envelope(SENDER_HINT_SEQUENCE_NUMBER, - originalEnvelope.getJobID(), originalEnvelope.getSource()); - - final SenderHintEvent senderEvent = new SenderHintEvent(source, remoteReceiver); - envelope.serializeEventList(Arrays.asList(senderEvent)); - - return envelope; - } - - static boolean isSenderHintEvent(final Envelope envelope) { - - if (envelope.getSequenceNumber() != SENDER_HINT_SEQUENCE_NUMBER) { - return false; - } - - if (envelope.getBuffer() != null) { - return false; - } - - List<? extends AbstractEvent> events = envelope.deserializeEvents(); - - if (events.size() != 1) { - return false; - } - - if (!(events.get(0) instanceof SenderHintEvent)) { - return false; - } - - return true; - } -}