Repository: flink Updated Branches: refs/heads/master 1230bcaa0 -> 4883af675
Small cleanup to truncate some lines that are too long for easy read of the code. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4883af67 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4883af67 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4883af67 Branch: refs/heads/master Commit: 4883af675e19d8a9c750a83b3f2c019583e6bf7f Parents: 1230bca Author: Henry Saputra <henry.sapu...@gmail.com> Authored: Tue Feb 24 10:57:33 2015 -0800 Committer: Henry Saputra <henry.sapu...@gmail.com> Committed: Tue Feb 24 10:58:09 2015 -0800 ---------------------------------------------------------------------- .../network/partition/IntermediateResultPartition.java | 12 ++++++++---- .../partition/queue/PipelinedPartitionQueue.java | 3 ++- 2 files changed, 10 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4883af67/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java index 71af7a6..80bd38d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java @@ -213,7 +213,8 @@ public class IntermediateResultPartition implements BufferPoolOwner { // Consume // ------------------------------------------------------------------------ - public IntermediateResultPartitionQueueIterator getQueueIterator(int queueIndex, Optional<BufferProvider> bufferProvider) throws IOException { + public IntermediateResultPartitionQueueIterator getQueueIterator(int queueIndex, Optional<BufferProvider> bufferProvider) + throws IOException { synchronized (queues) { if (isReleased) { throw new IllegalQueueIteratorRequestException("Intermediate result partition has already been released."); @@ -231,7 +232,8 @@ public class IntermediateResultPartition implements BufferPoolOwner { @Override public String toString() { - return "Intermediate result partition " + partitionId + " [num queues: " + queues.length + ", " + (isFinished ? "finished" : "not finished") + "]"; + return "Intermediate result partition " + partitionId + " [num queues: " + queues.length + ", " + + (isFinished ? "finished" : "not finished") + "]"; } private void checkInProducePhase() { @@ -296,7 +298,8 @@ public class IntermediateResultPartition implements BufferPoolOwner { // ------------------------------------------------------------------------ - public static IntermediateResultPartition create(RuntimeEnvironment environment, int partitionIndex, JobID jobId, ExecutionAttemptID executionId, NetworkEnvironment networkEnvironment, PartitionDeploymentDescriptor desc) { + public static IntermediateResultPartition create(RuntimeEnvironment environment, int partitionIndex, JobID jobId, + ExecutionAttemptID executionId, NetworkEnvironment networkEnvironment, PartitionDeploymentDescriptor desc) { final IntermediateResultPartitionID partitionId = checkNotNull(desc.getPartitionId()); final IntermediateResultPartitionType partitionType = checkNotNull(desc.getPartitionType()); @@ -307,6 +310,7 @@ public class IntermediateResultPartition implements BufferPoolOwner { partitionQueues[i] = new PipelinedPartitionQueue(); } - return new IntermediateResultPartition(environment, partitionIndex, jobId, executionId, partitionId, partitionType, partitionQueues, networkEnvironment); + return new IntermediateResultPartition(environment, partitionIndex, jobId, executionId, partitionId, partitionType, + partitionQueues, networkEnvironment); } } http://git-wip-us.apache.org/repos/asf/flink/blob/4883af67/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java index a24cdeb..5d562e4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueue.java @@ -103,7 +103,8 @@ public class PipelinedPartitionQueue implements IntermediateResultPartitionQueue } @Override - public IntermediateResultPartitionQueueIterator getQueueIterator(Optional<BufferProvider> bufferProvider) throws IllegalQueueIteratorRequestException { + public IntermediateResultPartitionQueueIterator getQueueIterator(Optional<BufferProvider> bufferProvider) + throws IllegalQueueIteratorRequestException { synchronized (queue) { if (hasBeenDiscarded) { throw new IllegalQueueIteratorRequestException("Queue has been discarded during produce phase.");