[
https://issues.apache.org/jira/browse/FLINK-8959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405348#comment-16405348
]
ASF GitHub Bot commented on FLINK-8959:
---------------------------------------
Github user zentol commented on a diff in the pull request:
https://github.com/apache/flink/pull/5719#discussion_r175566292
--- Diff:
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
---
@@ -18,292 +18,188 @@
package org.apache.flink.test.accumulators;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.api.common.accumulators.IntCounter;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.streaming.api.datastream.DataStream;
import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.testutils.category.Flip6;
import org.apache.flink.util.Collector;
import org.apache.flink.util.TestLogger;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import org.junit.After;
import org.junit.Before;
+import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
- * Tests the availability of accumulator results during runtime. The test
case tests a user-defined
- * accumulator and Flink's internal accumulators for two consecutive tasks.
- *
- * <p>CHAINED[Source -> Map] -> Sink
- *
- * <p>Checks are performed as the elements arrive at the operators. Checks
consist of a message sent by
- * the task to the task manager which notifies the job manager and sends
the current accumulators.
- * The task blocks until the test has been notified about the current
accumulator values.
- *
- * <p>A barrier between the operators ensures that that pipelining is
disabled for the streaming test.
- * The batch job reads the records one at a time. The streaming code
buffers the records beforehand;
- * that's why exact guarantees about the number of records read are very
hard to make. Thus, why we
- * check for an upper bound of the elements read.
+ * Tests the availability of accumulator results during runtime.
*/
+@Category(Flip6.class)
public class AccumulatorLiveITCase extends TestLogger {
private static final Logger LOG =
LoggerFactory.getLogger(AccumulatorLiveITCase.class);
- private static ActorSystem system;
- private static ActorGateway jobManagerGateway;
- private static ActorRef taskManager;
-
- private static JobID jobID;
- private static JobGraph jobGraph;
-
// name of user accumulator
private static final String ACCUMULATOR_NAME = "test";
+ private static final long HEARTBEAT_INTERVAL = 50L;
+
// number of heartbeat intervals to check
private static final int NUM_ITERATIONS = 5;
- private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
+ private static final List<Integer> inputData = new
ArrayList<>(NUM_ITERATIONS);
- private static final FiniteDuration TIMEOUT = new FiniteDuration(10,
TimeUnit.SECONDS);
+ static {
+ // generate test data
+ for (int i = 0; i < NUM_ITERATIONS; i++) {
+ inputData.add(i);
+ }
+ }
- @Before
- public void before() throws Exception {
- system = AkkaUtils.createLocalActorSystem(new Configuration());
+ @ClassRule
+ public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new
MiniClusterResource(
+ new MiniClusterResource.MiniClusterResourceConfiguration(
+ getConfiguration(),
+ 1,
+ 1),
+ true);
+ private static Configuration getConfiguration() {
Configuration config = new Configuration();
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS,
1);
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
config.setString(AkkaOptions.ASK_TIMEOUT,
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
- TestingCluster testingCluster = new TestingCluster(config,
false, true);
- testingCluster.start();
-
- jobManagerGateway =
testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
- taskManager = testingCluster.getTaskManagersAsJava().get(0);
-
- // generate test data
- for (int i = 0; i < NUM_ITERATIONS; i++) {
- inputData.add(i, String.valueOf(i + 1));
- }
+ config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL,
HEARTBEAT_INTERVAL);
- NotifyingMapper.finished = false;
+ return config;
}
- @After
- public void after() throws Exception {
- JavaTestKit.shutdownActorSystem(system);
- inputData.clear();
+ @Before
+ public void resetLatches() throws InterruptedException {
+ NotifyingMapper.reset();
}
@Test
public void testBatch() throws Exception {
-
- /** The program **/
- ExecutionEnvironment env = new BatchPlanExtractor();
+ ExecutionEnvironment env =
ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
- DataSet<String> input = env.fromCollection(inputData);
+ DataSet<Integer> input = env.fromCollection(inputData);
input
.flatMap(new NotifyingMapper())
- .output(new NotifyingOutputFormat());
-
- env.execute();
+ .output(new DummyOutputFormat());
// Extract job graph and set job id for the task to notify of
accumulator changes.
- jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
- jobID = jobGraph.getJobID();
+ JobGraph jobGraph = getJobGraph(env.createProgramPlan());
- verifyResults();
+ submitJobAndVerifyResults(jobGraph);
}
@Test
public void testStreaming() throws Exception {
- StreamExecutionEnvironment env = new
DummyStreamExecutionEnvironment();
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
- DataStream<String> input = env.fromCollection(inputData);
+ DataStream<Integer> input = env.fromCollection(inputData);
input
.flatMap(new NotifyingMapper())
- .writeUsingOutputFormat(new
NotifyingOutputFormat()).disableChaining();
+ .writeUsingOutputFormat(new
DummyOutputFormat()).disableChaining();
- jobGraph = env.getStreamGraph().getJobGraph();
- jobID = jobGraph.getJobID();
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
- verifyResults();
+ submitJobAndVerifyResults(jobGraph);
}
- private static void verifyResults() {
- new JavaTestKit(system) {{
-
- ActorGateway selfGateway = new
AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
-
- // register for accumulator changes
- jobManagerGateway.tell(new
TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), selfGateway);
- expectMsgEquals(TIMEOUT, true);
-
- // submit job
-
- jobManagerGateway.tell(
- new JobManagerMessages.SubmitJob(
- jobGraph,
-
ListeningBehaviour.EXECUTION_RESULT),
- selfGateway);
- expectMsgClass(TIMEOUT,
JobManagerMessages.JobSubmitSuccess.class);
-
- TestingJobManagerMessages.UpdatedAccumulators msg =
(TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
- Map<String, Accumulator<?, ?>> userAccumulators =
msg.userAccumulators();
+ private static void submitJobAndVerifyResults(JobGraph jobGraph) throws
Exception {
- ExecutionAttemptID mapperTaskID = null;
-
- ExecutionAttemptID sinkTaskID = null;
-
- /* Check for accumulator values */
- if (checkUserAccumulators(0, userAccumulators)) {
- LOG.info("Passed initial check for map task.");
- } else {
- fail("Wrong accumulator results when map task
begins execution.");
- }
-
- int expectedAccVal = 0;
-
- /* for mapper task */
- for (int i = 1; i <= NUM_ITERATIONS; i++) {
- expectedAccVal += i;
-
- // receive message
- msg =
(TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
- userAccumulators = msg.userAccumulators();
-
- LOG.info("{}", userAccumulators);
-
- if (checkUserAccumulators(expectedAccVal,
userAccumulators)) {
- LOG.info("Passed round #" + i);
- } else if
(checkUserAccumulators(expectedAccVal, userAccumulators)) {
- // we determined the wrong task id and
need to switch the two here
- ExecutionAttemptID temp = mapperTaskID;
- mapperTaskID = sinkTaskID;
- sinkTaskID = temp;
- LOG.info("Passed round #" + i);
- } else {
- fail("Failed in round #" + i);
- }
- }
-
- msg = (TestingJobManagerMessages.UpdatedAccumulators)
receiveOne(TIMEOUT);
- userAccumulators = msg.userAccumulators();
-
- if (checkUserAccumulators(expectedAccVal,
userAccumulators)) {
- LOG.info("Passed initial check for sink task.");
- } else {
- fail("Wrong accumulator results when sink task
begins execution.");
- }
+ ClusterClient<?> client =
MINI_CLUSTER_RESOURCE.getClusterClient();
- /* for sink task */
- for (int i = 1; i <= NUM_ITERATIONS; i++) {
+ client.setDetached(true);
+ client.submitJob(jobGraph,
AccumulatorLiveITCase.class.getClassLoader());
- // receive message
- msg =
(TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
+ try {
+ NotifyingMapper.notifyLatch.await();
+ Thread.sleep(HEARTBEAT_INTERVAL * 4); // wait for
heartbeat
--- End diff --
Well....yeah we could use a deadline and polling, probably will be even
faster.
> Port AccumulatorLiveITCase to flip6
> -----------------------------------
>
> Key: FLINK-8959
> URL: https://issues.apache.org/jira/browse/FLINK-8959
> Project: Flink
> Issue Type: Improvement
> Components: Tests
> Affects Versions: 1.5.0
> Reporter: Chesnay Schepler
> Assignee: Chesnay Schepler
> Priority: Blocker
> Fix For: 1.5.0
>
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)