azagrebin commented on a change in pull request #8416: [FLINK-12331] Introduce
partition/gate setup to decouple task registration with NetworkEnvironment
URL: https://github.com/apache/flink/pull/8416#discussion_r285342718
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
##########
@@ -88,35 +86,62 @@
private final TaskEventPublisher taskEventPublisher;
- private final IOManager ioManager;
+ private final ResultPartitionFactory resultPartitionFactory;
+
+ private final SingleInputGateFactory singleInputGateFactory;
private boolean isShutdown;
- public NetworkEnvironment(
- NetworkEnvironmentConfiguration config,
- TaskEventPublisher taskEventPublisher,
- MetricGroup metricGroup,
- IOManager ioManager) {
- this.config = checkNotNull(config);
+ private NetworkEnvironment(
+ NetworkEnvironmentConfiguration config,
+ NetworkBufferPool networkBufferPool,
+ ConnectionManager connectionManager,
+ ResultPartitionManager resultPartitionManager,
+ TaskEventPublisher taskEventPublisher,
+ ResultPartitionFactory resultPartitionFactory,
+ SingleInputGateFactory singleInputGateFactory) {
+
+ this.config = config;
+ this.networkBufferPool = networkBufferPool;
+ this.connectionManager = connectionManager;
+ this.resultPartitionManager = resultPartitionManager;
+ this.taskEventPublisher = taskEventPublisher;
+ this.resultPartitionFactory = resultPartitionFactory;
+ this.singleInputGateFactory = singleInputGateFactory;
+ this.isShutdown = false;
+ }
- this.networkBufferPool = new
NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
+ public static NetworkEnvironment create(
+ NetworkEnvironmentConfiguration config,
+ TaskEventPublisher taskEventPublisher,
+ MetricGroup metricGroup,
+ IOManager ioManager) {
- NettyConfig nettyConfig = config.nettyConfig();
- if (nettyConfig != null) {
- this.connectionManager = new
NettyConnectionManager(nettyConfig, config.isCreditBased());
- } else {
- this.connectionManager = new LocalConnectionManager();
- }
+ NettyConfig nettyConfig = checkNotNull(config).nettyConfig();
+ ConnectionManager connectionManager = nettyConfig != null ?
+ new NettyConnectionManager(nettyConfig,
config.isCreditBased()) : new LocalConnectionManager();
- this.resultPartitionManager = new ResultPartitionManager();
-
- this.taskEventPublisher = checkNotNull(taskEventPublisher);
+ NetworkBufferPool networkBufferPool = new NetworkBufferPool(
+ config.numNetworkBuffers(), config.networkBufferSize(),
config.networkBuffersPerChannel());
registerNetworkMetrics(metricGroup, networkBufferPool);
- this.ioManager = checkNotNull(ioManager);
-
- isShutdown = false;
+ ResultPartitionManager resultPartitionManager = new
ResultPartitionManager();
+ ResultPartitionFactory resultPartitionFactory = new
ResultPartitionFactory(
+ resultPartitionManager, checkNotNull(ioManager),
networkBufferPool,
+ config.networkBuffersPerChannel(),
config.floatingNetworkBuffersPerGate());
Review comment:
In general, I would prefer to keep classes/functions to have as least
dependencies as possible if full `config` is not needed, it is easier to see
the real dependencies and use in other places, like avoid mocking full config
in tests.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services