This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0ee4038ef596b22630bc814f677fa489d3796241 Author: kevin.cyj <[email protected]> AuthorDate: Mon Jul 5 20:04:49 2021 +0800 [FLINK-23249][runtime] Introduce ShuffleMasterContext to ShuffleMaster --- .../io/network/NettyShuffleServiceFactory.java | 6 +-- .../jobmaster/JobManagerSharedServices.java | 7 +++- .../runtime/shuffle/ShuffleMasterContext.java | 36 ++++++++++++++++ .../runtime/shuffle/ShuffleMasterContextImpl.java | 48 ++++++++++++++++++++++ .../runtime/shuffle/ShuffleServiceFactory.java | 5 +-- .../runtime/shuffle/ShuffleServiceLoaderTest.java | 3 +- 6 files changed, 97 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java index 3f016c2..5c9b00d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.configuration.Configuration; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; @@ -36,6 +35,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGateFac import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.ShuffleEnvironmentContext; +import org.apache.flink.runtime.shuffle.ShuffleMasterContext; import org.apache.flink.runtime.shuffle.ShuffleServiceFactory; import org.apache.flink.runtime.taskmanager.NettyShuffleEnvironmentConfiguration; import org.apache.flink.runtime.util.Hardware; @@ -55,8 +55,8 @@ public class NettyShuffleServiceFactory private static final String DIR_NAME_PREFIX = "netty-shuffle"; @Override - public NettyShuffleMaster createShuffleMaster(Configuration configuration) { - return new NettyShuffleMaster(configuration); + public NettyShuffleMaster createShuffleMaster(ShuffleMasterContext shuffleMasterContext) { + return new NettyShuffleMaster(shuffleMasterContext.getConfiguration()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java index b46e1e8..1e0f0ae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerSharedServices.java @@ -27,6 +27,8 @@ import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.shuffle.ShuffleMasterContext; +import org.apache.flink.runtime.shuffle.ShuffleMasterContextImpl; import org.apache.flink.runtime.shuffle.ShuffleServiceLoader; import org.apache.flink.runtime.util.Hardware; import org.apache.flink.util.ExceptionUtils; @@ -144,8 +146,11 @@ public class JobManagerSharedServices { Hardware.getNumberCPUCores(), new ExecutorThreadFactory("jobmanager-future")); + final ShuffleMasterContext shuffleMasterContext = + new ShuffleMasterContextImpl(config, fatalErrorHandler); final ShuffleMaster<?> shuffleMaster = - ShuffleServiceLoader.loadShuffleServiceFactory(config).createShuffleMaster(config); + ShuffleServiceLoader.loadShuffleServiceFactory(config) + .createShuffleMaster(shuffleMasterContext); return new JobManagerSharedServices( futureExecutor, libraryCacheManager, shuffleMaster, blobServer); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMasterContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMasterContext.java new file mode 100644 index 0000000..a35b960 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMasterContext.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.configuration.Configuration; + +/** + * Shuffle context used to create {@link ShuffleMaster}. It can work as a proxy to other cluster + * components and hide these components from users. For example, the customized shuffle master can + * access the cluster fatal error handler through this context and in the future, more components + * like the resource manager partition tracker will be accessible. + */ +public interface ShuffleMasterContext { + + /** @return the cluster configuration. */ + Configuration getConfiguration(); + + /** Handles the fatal error if any. */ + void onFatalError(Throwable throwable); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMasterContextImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMasterContextImpl.java new file mode 100644 index 0000000..111c57b --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleMasterContextImpl.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.shuffle; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** The default implementation of {@link ShuffleMasterContext}. */ +public class ShuffleMasterContextImpl implements ShuffleMasterContext { + + private final Configuration configuration; + + private final FatalErrorHandler fatalErrorHandler; + + public ShuffleMasterContextImpl( + Configuration configuration, FatalErrorHandler fatalErrorHandler) { + this.configuration = checkNotNull(configuration); + this.fatalErrorHandler = checkNotNull(fatalErrorHandler); + } + + @Override + public Configuration getConfiguration() { + return configuration; + } + + @Override + public void onFatalError(Throwable throwable) { + fatalErrorHandler.onFatalError(throwable); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java index 9cd8166..6d81a22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleServiceFactory.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.shuffle; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter; import org.apache.flink.runtime.io.network.partition.consumer.IndexedInputGate; @@ -39,10 +38,10 @@ public interface ShuffleServiceFactory< /** * Factory method to create a specific {@link ShuffleMaster} implementation. * - * @param configuration Flink configuration + * @param shuffleMasterContext shuffle context for shuffle master. * @return shuffle manager implementation */ - ShuffleMaster<SD> createShuffleMaster(Configuration configuration); + ShuffleMaster<SD> createShuffleMaster(ShuffleMasterContext shuffleMasterContext); /** * Factory method to create a specific local {@link ShuffleEnvironment} implementation. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java index 2e71b19..ecbc479 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleServiceLoaderTest.java @@ -76,7 +76,8 @@ public class ShuffleServiceLoaderTest extends TestLogger { implements ShuffleServiceFactory< ShuffleDescriptor, ResultPartitionWriter, IndexedInputGate> { @Override - public ShuffleMaster<ShuffleDescriptor> createShuffleMaster(Configuration configuration) { + public ShuffleMaster<ShuffleDescriptor> createShuffleMaster( + ShuffleMasterContext shuffleMasterContext) { throw new UnsupportedOperationException(); }
