[FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/67bd8277
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/67bd8277
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/67bd8277

Branch: refs/heads/master
Commit: 67bd8277d1dc1179c30d2dbad0922122ed6f49ee
Parents: dc5650a
Author: Ufuk Celebi <u...@apache.org>
Authored: Tue Nov 29 16:04:48 2016 +0100
Committer: Ufuk Celebi <u...@apache.org>
Committed: Thu Dec 1 10:44:23 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java         |  3 ---
 .../partition/consumer/SingleInputGate.java       | 18 +++++++++++++++---
 2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/67bd8277/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index a72b92f..9b3ce5f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -35,7 +35,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -143,8 +142,6 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
                                        consumedPartitionId, partitionLocation);
                }
 
-               LOG.debug("Created {} from edges {}.", Arrays.toString(icdd), 
Arrays.toString(edges));
-
                return icdd;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/67bd8277/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 8f57542..d7ed33c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -45,7 +45,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
@@ -554,8 +553,11 @@ public class SingleInputGate implements InputGate {
                // Create the input channels. There is one input channel for 
each consumed partition.
                final InputChannel[] inputChannels = new 
InputChannel[icdd.length];
 
-               for (int i = 0; i < inputChannels.length; i++) {
+               int numLocalChannels = 0;
+               int numRemoteChannels = 0;
+               int numUnknownChannels = 0;
 
+               for (int i = 0; i < inputChannels.length; i++) {
                        final ResultPartitionID partitionId = 
icdd[i].getConsumedPartitionId();
                        final ResultPartitionLocation partitionLocation = 
icdd[i].getConsumedPartitionLocation();
 
@@ -567,6 +569,8 @@ public class SingleInputGate implements InputGate {
                                        
networkEnvironment.getPartitionRequestMaxBackoff(),
                                        metrics
                                );
+
+                               numLocalChannels++;
                        }
                        else if (partitionLocation.isRemote()) {
                                inputChannels[i] = new 
RemoteInputChannel(inputGate, i, partitionId,
@@ -576,6 +580,8 @@ public class SingleInputGate implements InputGate {
                                        
networkEnvironment.getPartitionRequestMaxBackoff(),
                                        metrics
                                );
+
+                               numRemoteChannels++;
                        }
                        else if (partitionLocation.isUnknown()) {
                                inputChannels[i] = new 
UnknownInputChannel(inputGate, i, partitionId,
@@ -586,6 +592,8 @@ public class SingleInputGate implements InputGate {
                                        
networkEnvironment.getPartitionRequestMaxBackoff(),
                                        metrics
                                );
+
+                               numUnknownChannels++;
                        }
                        else {
                                throw new IllegalStateException("Unexpected 
partition location.");
@@ -594,7 +602,11 @@ public class SingleInputGate implements InputGate {
                        inputGate.setInputChannel(partitionId.getPartitionId(), 
inputChannels[i]);
                }
 
-               LOG.debug("Created input channels {} from {}.", 
Arrays.toString(inputChannels), igdd);
+               LOG.debug("Created {} input channels (local: {}, remote: {}, 
unknown: {}).",
+                       inputChannels.length,
+                       numLocalChannels,
+                       numRemoteChannels,
+                       numUnknownChannels);
 
                return inputGate;
        }

Reply via email to