tillrohrmann closed pull request #6748: [FLINK-10396] Remove CodebaseType
URL: https://github.com/apache/flink/pull/6748
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
index b90277fa771..3ae830d0618 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebFrontendITCase.java
@@ -152,11 +152,7 @@ public void testResponseHeaders() throws Exception {
if (notFoundJobConnection.getResponseCode() >= 400) {
// we don't set the content-encoding header
Assert.assertNull(notFoundJobConnection.getContentEncoding());
- if (CLUSTER.getCodebaseType() ==
TestBaseUtils.CodebaseType.NEW) {
- Assert.assertEquals("application/json;
charset=UTF-8", notFoundJobConnection.getContentType());
- } else {
- Assert.assertEquals("text/plain;
charset=UTF-8", notFoundJobConnection.getContentType());
- }
+ Assert.assertEquals("application/json; charset=UTF-8",
notFoundJobConnection.getContentType());
} else {
throw new RuntimeException("Request for non-existing
job did not return an error.");
}
@@ -280,23 +276,13 @@ public void testStop() throws Exception {
final Deadline deadline = testTimeout.fromNow();
try (HttpTestClient client = new HttpTestClient("localhost",
CLUSTER.getWebUIPort())) {
- if (CLUSTER.getCodebaseType() ==
TestBaseUtils.CodebaseType.NEW) {
- // stop the job
- client.sendPatchRequest("/jobs/" + jid +
"/?mode=stop", deadline.timeLeft());
- HttpTestClient.SimpleHttpResponse response =
client.getNextResponse(deadline.timeLeft());
-
- assertEquals(HttpResponseStatus.ACCEPTED,
response.getStatus());
- assertEquals("application/json; charset=UTF-8",
response.getType());
- assertEquals("{}", response.getContent());
- } else {
- // stop the job
- client.sendDeleteRequest("/jobs/" + jid +
"/stop", deadline.timeLeft());
- HttpTestClient.SimpleHttpResponse response =
client.getNextResponse(deadline.timeLeft());
-
- assertEquals(HttpResponseStatus.OK,
response.getStatus());
- assertEquals("application/json; charset=UTF-8",
response.getType());
- assertEquals("{}", response.getContent());
- }
+ // stop the job
+ client.sendPatchRequest("/jobs/" + jid + "/?mode=stop",
deadline.timeLeft());
+ HttpTestClient.SimpleHttpResponse response =
client.getNextResponse(deadline.timeLeft());
+
+ assertEquals(HttpResponseStatus.ACCEPTED,
response.getStatus());
+ assertEquals("application/json; charset=UTF-8",
response.getType());
+ assertEquals("{}", response.getContent());
}
// wait for cancellation to finish
@@ -355,11 +341,7 @@ public void testStopYarn() throws Exception {
HttpTestClient.SimpleHttpResponse response = client
.getNextResponse(deadline.timeLeft());
- if (CLUSTER.getCodebaseType() ==
TestBaseUtils.CodebaseType.NEW) {
- assertEquals(HttpResponseStatus.ACCEPTED,
response.getStatus());
- } else {
- assertEquals(HttpResponseStatus.OK,
response.getStatus());
- }
+ assertEquals(HttpResponseStatus.ACCEPTED,
response.getStatus());
assertEquals("application/json; charset=UTF-8",
response.getType());
assertEquals("{}", response.getContent());
}
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
index 6427f4de58d..a2138e14383 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -28,7 +28,6 @@
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.ExceptionUtils;
import org.junit.ClassRule;
@@ -67,7 +66,6 @@ public void testRunJar() throws Exception {
.setConfiguration(config)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
- .setCodebaseType(TestBaseUtils.CodebaseType.NEW)
.build());
clusterResource.before();
diff --git
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
index 4451e6d8bcb..18dd76e05b0 100644
---
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
+++
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/history/HistoryServerTest.java
@@ -29,7 +29,6 @@
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonGenerator;
@@ -81,7 +80,6 @@ public void setUp() throws Exception {
.setConfiguration(clusterConfig)
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(1)
- .setCodebaseType(TestBaseUtils.CodebaseType.NEW)
.build());
cluster.before();
}
diff --git
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
index c6b43c2920a..54bb16f038c 100644
---
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
+++
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellITCase.scala
@@ -19,13 +19,10 @@
package org.apache.flink.api.scala
import java.io._
-import java.util.Objects
-import org.apache.flink.configuration.{Configuration, CoreOptions,
RestOptions, TaskManagerOptions}
+import org.apache.flink.configuration.{Configuration, CoreOptions, RestOptions}
import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.runtime.minicluster.{MiniCluster,
MiniClusterConfiguration, StandaloneMiniCluster}
-import org.apache.flink.test.util.{MiniClusterResource, TestBaseUtils}
-import org.apache.flink.test.util.TestBaseUtils.CodebaseType
+import org.apache.flink.runtime.minicluster.{MiniCluster,
MiniClusterConfiguration}
import org.apache.flink.util.TestLogger
import org.junit._
import org.junit.rules.TemporaryFolder
@@ -314,7 +311,7 @@ class ScalaShellITCase extends TestLogger {
object ScalaShellITCase {
val configuration = new Configuration()
- var cluster: Option[Either[MiniCluster, StandaloneMiniCluster]] = None
+ var cluster: Option[MiniCluster] = None
var port: Int = _
var hostname : String = _
@@ -322,32 +319,20 @@ object ScalaShellITCase {
@BeforeClass
def beforeAll(): Unit = {
- val isNew = TestBaseUtils.isNewCodebase()
- if (isNew) {
- configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
- // set to different than default so not to interfere with
ScalaShellLocalStartupITCase
- configuration.setInteger(RestOptions.PORT, 8082)
- val miniConfig = new MiniClusterConfiguration.Builder()
- .setConfiguration(configuration)
- .setNumSlotsPerTaskManager(parallelism)
- .build()
-
- val miniCluster = new MiniCluster(miniConfig)
- miniCluster.start()
- port = miniCluster.getRestAddress.getPort
- hostname = miniCluster.getRestAddress.getHost
-
- cluster = Some(Left(miniCluster))
- } else {
- configuration.setString(CoreOptions.MODE, CoreOptions.LEGACY_MODE)
- configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, parallelism)
- val standaloneCluster = new StandaloneMiniCluster(configuration)
-
- hostname = standaloneCluster.getHostname
- port = standaloneCluster.getPort
-
- cluster = Some(Right(standaloneCluster))
- }
+ configuration.setString(CoreOptions.MODE, CoreOptions.NEW_MODE)
+ // set to different than default so not to interfere with
ScalaShellLocalStartupITCase
+ configuration.setInteger(RestOptions.PORT, 8082)
+ val miniConfig = new MiniClusterConfiguration.Builder()
+ .setConfiguration(configuration)
+ .setNumSlotsPerTaskManager(parallelism)
+ .build()
+
+ val miniCluster = new MiniCluster(miniConfig)
+ miniCluster.start()
+ port = miniCluster.getRestAddress.getPort
+ hostname = miniCluster.getRestAddress.getHost
+
+ cluster = Some(miniCluster)
}
@AfterClass
@@ -356,8 +341,7 @@ object ScalaShellITCase {
Thread.currentThread().setContextClassLoader(classOf[ScalaShellITCase].getClassLoader)
cluster.foreach {
- case Left(miniCluster) => miniCluster.close()
- case Right(miniCluster) => miniCluster.close()
+ miniCluster => miniCluster.close()
}
}
diff --git
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
index f13f57be0bc..3952a0f6259 100644
---
a/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
+++
b/flink-scala-shell/src/test/scala/org/apache/flink/api/scala/ScalaShellLocalStartupITCase.scala
@@ -20,9 +20,8 @@ package org.apache.flink.api.scala
import java.io._
-import org.apache.flink.configuration.{Configuration, CoreOptions}
+import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.clusterframework.BootstrapTools
-import org.apache.flink.test.util.TestBaseUtils
import org.apache.flink.util.TestLogger
import org.junit.rules.TemporaryFolder
import org.junit.{Assert, Rule, Test}
@@ -86,12 +85,6 @@ class ScalaShellLocalStartupITCase extends TestLogger {
System.setOut(new PrintStream(baos))
val configuration = new Configuration()
- val mode = if (TestBaseUtils.isNewCodebase()) {
- CoreOptions.NEW_MODE
- } else {
- CoreOptions.LEGACY_MODE
- }
- configuration.setString(CoreOptions.MODE, mode)
val dir = temporaryFolder.newFolder()
BootstrapTools.writeConfiguration(configuration, new File(dir,
"flink-conf.yaml"))
diff --git
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 1835fc6fdcd..9140bb4eff5 100644
---
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -19,10 +19,7 @@
package org.apache.flink.test.util;
import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.DefaultActorSystemLoader;
import org.apache.flink.client.program.MiniClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
@@ -30,17 +27,12 @@
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.minicluster.JobExecutorService;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
-import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
-import akka.actor.ActorSystem;
-import org.junit.Assume;
import org.junit.rules.ExternalResource;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
@@ -49,11 +41,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-import scala.Option;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.hamcrest.CoreMatchers.is;
-
/**
* Starts a Flink mini cluster as a resource and registers the respective
* ExecutionEnvironment and StreamExecutionEnvironment.
@@ -66,8 +53,6 @@
private final MiniClusterResourceConfiguration
miniClusterResourceConfiguration;
- private final TestBaseUtils.CodebaseType codebaseType;
-
private JobExecutorService jobExecutorService;
private ClusterClient<?> clusterClient;
@@ -82,11 +67,6 @@
public MiniClusterResource(final MiniClusterResourceConfiguration
miniClusterResourceConfiguration) {
this.miniClusterResourceConfiguration =
Preconditions.checkNotNull(miniClusterResourceConfiguration);
- this.codebaseType =
miniClusterResourceConfiguration.getCodebaseType();
- }
-
- public TestBaseUtils.CodebaseType getCodebaseType() {
- return codebaseType;
}
public int getNumberSlots() {
@@ -111,12 +91,9 @@ public int getWebUIPort() {
@Override
public void before() throws Exception {
- // verify that we are running in the correct test profile
- Assume.assumeThat(TestBaseUtils.getCodebaseType(),
is(equalTo(codebaseType)));
-
temporaryFolder.create();
- startJobExecutorService(codebaseType);
+ startMiniCluster();
numberSlots =
miniClusterResourceConfiguration.getNumberSlotsPerTaskManager() *
miniClusterResourceConfiguration.getNumberTaskManagers();
@@ -163,57 +140,6 @@ public void after() {
}
}
- private void startJobExecutorService(TestBaseUtils.CodebaseType
miniClusterType) throws Exception {
- switch (miniClusterType) {
- case LEGACY:
- startLegacyMiniCluster();
- break;
- case NEW:
- startMiniCluster();
- break;
- default:
- throw new FlinkRuntimeException("Unknown
MiniClusterType " + miniClusterType + '.');
- }
- }
-
- private void startLegacyMiniCluster() throws Exception {
- final Configuration configuration = new
Configuration(miniClusterResourceConfiguration.getConfiguration());
-
configuration.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
miniClusterResourceConfiguration.getNumberTaskManagers());
- configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,
miniClusterResourceConfiguration.getNumberSlotsPerTaskManager());
- configuration.setString(CoreOptions.TMP_DIRS,
temporaryFolder.newFolder().getAbsolutePath());
-
- final LocalFlinkMiniCluster flinkMiniCluster =
TestBaseUtils.startCluster(
- configuration,
- miniClusterResourceConfiguration.getRpcServiceSharing()
== RpcServiceSharing.SHARED);
-
- jobExecutorService = flinkMiniCluster;
-
- switch
(miniClusterResourceConfiguration.getRpcServiceSharing()) {
- case SHARED:
- Option<ActorSystem> actorSystemOption =
flinkMiniCluster.firstActorSystem();
-
Preconditions.checkState(actorSystemOption.isDefined());
-
- final ActorSystem actorSystem =
actorSystemOption.get();
- clusterClient = new StandaloneClusterClient(
- configuration,
-
flinkMiniCluster.highAvailabilityServices(),
- true,
- new
DefaultActorSystemLoader(actorSystem));
- break;
- case DEDICATED:
- clusterClient = new
StandaloneClusterClient(configuration,
flinkMiniCluster.highAvailabilityServices(), true);
- break;
- }
-
- Configuration restClientConfig = new Configuration();
- restClientConfig.setInteger(JobManagerOptions.PORT,
flinkMiniCluster.getLeaderRPCPort());
- this.restClusterClientConfig = new
UnmodifiableConfiguration(restClientConfig);
-
- if (flinkMiniCluster.webMonitor().isDefined()) {
- webUIPort =
flinkMiniCluster.webMonitor().get().getServerPort();
- }
- }
-
private void startMiniCluster() throws Exception {
final Configuration configuration =
miniClusterResourceConfiguration.getConfiguration();
configuration.setString(CoreOptions.TMP_DIRS,
temporaryFolder.newFolder().getAbsolutePath());
diff --git
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
index c9389201ed8..bd521a2f202 100644
---
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
+++
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResourceConfiguration.java
@@ -37,8 +37,6 @@
private final Time shutdownTimeout;
- private final TestBaseUtils.CodebaseType codebaseType;
-
private final RpcServiceSharing rpcServiceSharing;
MiniClusterResourceConfiguration(
@@ -46,13 +44,11 @@
int numberTaskManagers,
int numberSlotsPerTaskManager,
Time shutdownTimeout,
- TestBaseUtils.CodebaseType codebaseType,
RpcServiceSharing rpcServiceSharing) {
this.configuration = Preconditions.checkNotNull(configuration);
this.numberTaskManagers = numberTaskManagers;
this.numberSlotsPerTaskManager = numberSlotsPerTaskManager;
this.shutdownTimeout =
Preconditions.checkNotNull(shutdownTimeout);
- this.codebaseType = Preconditions.checkNotNull(codebaseType);
this.rpcServiceSharing =
Preconditions.checkNotNull(rpcServiceSharing);
}
@@ -72,14 +68,6 @@ public Time getShutdownTimeout() {
return shutdownTimeout;
}
- /**
- * @deprecated Will be irrelevant once the legacy mode has been removed.
- */
- @Deprecated
- public TestBaseUtils.CodebaseType getCodebaseType() {
- return codebaseType;
- }
-
public RpcServiceSharing getRpcServiceSharing() {
return rpcServiceSharing;
}
@@ -93,7 +81,6 @@ public RpcServiceSharing getRpcServiceSharing() {
private int numberTaskManagers = 1;
private int numberSlotsPerTaskManager = 1;
private Time shutdownTimeout =
AkkaUtils.getTimeoutAsTime(configuration);
- private TestBaseUtils.CodebaseType codebaseType =
TestBaseUtils.getCodebaseType();
private RpcServiceSharing rpcServiceSharing =
RpcServiceSharing.SHARED;
@@ -117,22 +104,13 @@ public Builder setShutdownTimeout(Time shutdownTimeout) {
return this;
}
- /**
- * @deprecated Will be irrelevant once the legacy mode has been
removed.
- */
- @Deprecated
- public Builder setCodebaseType(TestBaseUtils.CodebaseType
codebaseType) {
- this.codebaseType = codebaseType;
- return this;
- }
-
public Builder setRpcServiceSharing(RpcServiceSharing
rpcServiceSharing) {
this.rpcServiceSharing = rpcServiceSharing;
return this;
}
public MiniClusterResourceConfiguration build() {
- return new
MiniClusterResourceConfiguration(configuration, numberTaskManagers,
numberSlotsPerTaskManager, shutdownTimeout, codebaseType, rpcServiceSharing);
+ return new
MiniClusterResourceConfiguration(configuration, numberTaskManagers,
numberSlotsPerTaskManager, shutdownTimeout, rpcServiceSharing);
}
}
}
diff --git
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index a0f52f2ae77..363327a2d28 100644
---
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -44,8 +44,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.annotation.Nonnull;
-
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
@@ -678,26 +676,6 @@ public static String getFromHTTP(String url, Time timeout)
throws Exception {
throw new TimeoutException("Could not get HTTP response in time
since the service is still unavailable.");
}
- @Nonnull
- public static CodebaseType getCodebaseType() {
- return Objects.equals(NEW_CODEBASE,
System.getProperty(CODEBASE_KEY)) ? CodebaseType.NEW : CodebaseType.LEGACY;
- }
-
- public static boolean isNewCodebase() {
- return CodebaseType.NEW == getCodebaseType();
- }
-
- /**
- * Type of the mini cluster to start.
- *
- * @deprecated Will be irrelevant once the legacy mode has been removed.
- */
- @Deprecated
- public enum CodebaseType {
- LEGACY,
- NEW
- }
-
/**
* Comparator for comparable Tuples.
* @param <T> tuple type
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 66a894fdda9..56df46e49ea 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -31,7 +31,6 @@
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -58,7 +57,6 @@
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(4)
- .setCodebaseType(TestBaseUtils.CodebaseType.NEW)
.build());
private RestClusterClient<StandaloneClusterId> client;
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
index 4826a467491..f62ccf7374d 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/misc/AutoParallelismITCase.java
@@ -28,7 +28,6 @@
import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
import org.apache.flink.test.util.MiniClusterResource;
import org.apache.flink.test.util.MiniClusterResourceConfiguration;
-import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.Collector;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
@@ -80,9 +79,6 @@ public void testProgramWithAutoParallelism() throws Exception
{
assertEquals(PARALLELISM, resultCollection.size());
}
catch (Exception ex) {
- if (MINI_CLUSTER_RESOURCE.getCodebaseType() ==
TestBaseUtils.CodebaseType.LEGACY) {
- throw ex;
- }
assertTrue(
ExceptionUtils.findThrowableWithMessage(ex,
ExecutionGraphBuilder.PARALLELISM_AUTO_MAX_ERROR_MESSAGE).isPresent());
}
diff --git
a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 1a0520f68fe..3763f6592af 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -21,7 +21,6 @@
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.test.util.TestBaseUtils;
@@ -75,7 +74,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
import java.util.Scanner;
import java.util.Set;
import java.util.UUID;
@@ -153,7 +151,7 @@
protected org.apache.flink.configuration.Configuration
flinkConfiguration;
- protected boolean isNewMode;
+ protected final boolean isNewMode = true;
static {
YARN_CONFIGURATION = new YarnConfiguration();
@@ -198,8 +196,6 @@ public void checkClusterEmpty() {
}
flinkConfiguration = new
org.apache.flink.configuration.Configuration(globalConfiguration);
-
- isNewMode = Objects.equals(TestBaseUtils.CodebaseType.NEW,
TestBaseUtils.getCodebaseType());
}
/**
@@ -552,9 +548,6 @@ private static void start(YarnConfiguration conf, String
principal, String keyta
FileUtils.copyDirectory(new File(confDirPath),
tempConfPathForSecureRun);
- globalConfiguration.setString(CoreOptions.MODE,
- Objects.equals(TestBaseUtils.CodebaseType.NEW,
TestBaseUtils.getCodebaseType()) ? CoreOptions.NEW_MODE :
CoreOptions.LEGACY_MODE);
-
BootstrapTools.writeConfiguration(
globalConfiguration,
new File(tempConfPathForSecureRun,
"flink-conf.yaml"));
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services