[
https://issues.apache.org/jira/browse/FLINK-10541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16648999#comment-16648999
]
ASF GitHub Bot commented on FLINK-10541:
----------------------------------------
zentol closed pull request #6835: [FLINK-10541] [tests] Removed unused code
depends on LocalFlinkMiniCl…
URL: https://github.com/apache/flink/pull/6835
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index 363327a2d28..77818bcce4f 100644
---
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -20,24 +20,13 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.configuration.WebOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.runtime.messages.TaskManagerMessages;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import akka.actor.ActorRef;
-import akka.dispatch.Futures;
-import akka.pattern.Patterns;
-import akka.util.Timeout;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.Assert;
@@ -52,13 +41,10 @@
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.reflect.Method;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
-import java.nio.file.Files;
-import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
@@ -73,10 +59,8 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import scala.concurrent.Await;
import scala.concurrent.ExecutionContext;
import scala.concurrent.ExecutionContext$;
-import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -103,9 +87,6 @@
public static final Time DEFAULT_HTTP_TIMEOUT = Time.seconds(10L);
- static final String NEW_CODEBASE = "new";
- static final String CODEBASE_KEY = "codebase";
-
//
------------------------------------------------------------------------
protected static File logDir;
@@ -120,139 +101,6 @@ private static void verifyJvmOptions() {
+ "m", heap > MINIMUM_HEAP_SIZE_MB - 50);
}
- public static LocalFlinkMiniCluster startCluster(
- int numTaskManagers,
- int taskManagerNumSlots,
- boolean startWebserver,
- boolean startZooKeeper,
- boolean singleActorSystem) throws Exception {
-
- Configuration config = new Configuration();
-
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
numTaskManagers);
- config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,
taskManagerNumSlots);
-
- config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER,
startWebserver);
-
- if (startZooKeeper) {
-
config.setInteger(ConfigConstants.LOCAL_NUMBER_JOB_MANAGER, 3);
- config.setString(HighAvailabilityOptions.HA_MODE,
"zookeeper");
- }
-
- return startCluster(config, singleActorSystem);
- }
-
- public static LocalFlinkMiniCluster startCluster(
- Configuration config,
- boolean singleActorSystem) throws Exception {
-
- if (!config.contains(WebOptions.LOG_PATH) ||
!config.containsKey(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY)) {
- logDir = File.createTempFile("TestBaseUtils-logdir",
null);
- Assert.assertTrue("Unable to delete temp file",
logDir.delete());
- Assert.assertTrue("Unable to create temp directory",
logDir.mkdir());
- Path logFile = Files.createFile(new File(logDir,
"jobmanager.log").toPath());
- Files.createFile(new File(logDir,
"jobmanager.out").toPath());
-
- if (!config.contains(WebOptions.LOG_PATH)) {
- config.setString(WebOptions.LOG_PATH,
logFile.toString());
- }
-
- if
(!config.containsKey(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY)) {
-
config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString());
- }
- }
-
- if (!config.contains(WebOptions.PORT)) {
- config.setInteger(WebOptions.PORT, 8081);
- }
-
- if (!config.contains(AkkaOptions.ASK_TIMEOUT)) {
- config.setString(AkkaOptions.ASK_TIMEOUT,
DEFAULT_AKKA_ASK_TIMEOUT + "s");
- }
-
- if (!config.contains(AkkaOptions.STARTUP_TIMEOUT)) {
- config.setString(AkkaOptions.STARTUP_TIMEOUT,
DEFAULT_AKKA_STARTUP_TIMEOUT);
- }
-
- if (!config.contains(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE)) {
-
config.setBoolean(CoreOptions.FILESYTEM_DEFAULT_OVERRIDE, true);
- }
-
- if (!config.contains(TaskManagerOptions.MANAGED_MEMORY_SIZE)) {
-
config.setString(TaskManagerOptions.MANAGED_MEMORY_SIZE,
TASK_MANAGER_MEMORY_SIZE);
- }
-
- LocalFlinkMiniCluster cluster = new
LocalFlinkMiniCluster(config, singleActorSystem);
-
- cluster.start();
-
- return cluster;
- }
-
- public static void stopCluster(LocalFlinkMiniCluster executor,
FiniteDuration timeout) throws Exception {
- if (logDir != null) {
- FileUtils.deleteDirectory(logDir);
- }
- if (executor != null) {
- int numUnreleasedBCVars = 0;
- int numActiveConnections = 0;
-
- if (executor.running()) {
- List<ActorRef> tms =
executor.getTaskManagersAsJava();
- List<Future<Object>>
bcVariableManagerResponseFutures = new ArrayList<>();
- List<Future<Object>>
numActiveConnectionsResponseFutures = new ArrayList<>();
-
- for (ActorRef tm : tms) {
-
bcVariableManagerResponseFutures.add(Patterns.ask(
- tm,
-
TaskManagerMessages.getRequestBroadcastVariablesWithReferences(),
- new Timeout(timeout)));
-
-
numActiveConnectionsResponseFutures.add(Patterns.ask(
- tm,
-
TaskManagerMessages.getRequestNumActiveConnections(),
- new Timeout(timeout)));
- }
-
- Future<Iterable<Object>>
bcVariableManagerFutureResponses = Futures.sequence(
-
bcVariableManagerResponseFutures, defaultExecutionContext());
-
- Iterable<Object> responses =
Await.result(bcVariableManagerFutureResponses, timeout);
-
- for (Object response : responses) {
- numUnreleasedBCVars +=
((TaskManagerMessages.ResponseBroadcastVariablesWithReferences)
response).number();
- }
-
- Future<Iterable<Object>>
numActiveConnectionsFutureResponses = Futures.sequence(
-
numActiveConnectionsResponseFutures, defaultExecutionContext());
-
- responses =
Await.result(numActiveConnectionsFutureResponses, timeout);
-
- for (Object response : responses) {
- numActiveConnections +=
((TaskManagerMessages.ResponseNumActiveConnections) response).number();
- }
- }
-
- executor.stop();
- try {
- Class<?> hadoopFileSystemClass = Class.forName(
- "org.apache.hadoop.fs.FileSystem",
- true,
- TestBaseUtils.class.getClassLoader());
-
- Method closeAllMethod =
hadoopFileSystemClass.getMethod("closeAll");
- closeAllMethod.invoke(null);
- } catch (Throwable e) {
- // ignore
- }
- System.gc();
-
- Assert.assertEquals("Not all broadcast variables were
released.", 0, numUnreleasedBCVars);
- Assert.assertEquals("Not all TCP connections were
released.", 0, numActiveConnections);
- }
-
- }
-
//
--------------------------------------------------------------------------------------------
// Result Checking
//
--------------------------------------------------------------------------------------------
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
> Removed unused code depends on LocalFlinkMiniCluster
> ----------------------------------------------------
>
> Key: FLINK-10541
> URL: https://issues.apache.org/jira/browse/FLINK-10541
> Project: Flink
> Issue Type: Sub-task
> Components: Tests
> Affects Versions: 1.7.0
> Reporter: TisonKun
> Assignee: TisonKun
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.7.0
>
>
> {{TestBaseUtils#startCluster}} depends on legacy class
> {{LocalFlinkMiniCluster}}, however, this class itself is unused. Let's remove
> it and help confirm we can directly remove {{LocalFlinkMiniCluster}}.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)