[ 
https://issues.apache.org/jira/browse/FLINK-8959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16405348#comment-16405348
 ] 

ASF GitHub Bot commented on FLINK-8959:
---------------------------------------

Github user zentol commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5719#discussion_r175566292
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
 ---
    @@ -18,292 +18,188 @@
     
     package org.apache.flink.test.accumulators;
     
    -import org.apache.flink.api.common.JobExecutionResult;
    -import org.apache.flink.api.common.JobID;
     import org.apache.flink.api.common.Plan;
    -import org.apache.flink.api.common.accumulators.Accumulator;
     import org.apache.flink.api.common.accumulators.IntCounter;
     import org.apache.flink.api.common.functions.RichFlatMapFunction;
     import org.apache.flink.api.common.io.OutputFormat;
     import org.apache.flink.api.java.DataSet;
     import org.apache.flink.api.java.ExecutionEnvironment;
    -import org.apache.flink.api.java.LocalEnvironment;
    +import org.apache.flink.client.program.ClusterClient;
     import org.apache.flink.configuration.AkkaOptions;
    -import org.apache.flink.configuration.ConfigConstants;
     import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.HeartbeatManagerOptions;
    +import org.apache.flink.core.testutils.MultiShotLatch;
     import org.apache.flink.optimizer.DataStatistics;
     import org.apache.flink.optimizer.Optimizer;
     import org.apache.flink.optimizer.plan.OptimizedPlan;
     import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
    -import org.apache.flink.runtime.akka.AkkaUtils;
    -import org.apache.flink.runtime.akka.ListeningBehaviour;
    -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
    -import org.apache.flink.runtime.instance.ActorGateway;
    -import org.apache.flink.runtime.instance.AkkaActorGateway;
     import org.apache.flink.runtime.jobgraph.JobGraph;
    -import org.apache.flink.runtime.messages.JobManagerMessages;
    -import org.apache.flink.runtime.testingUtils.TestingCluster;
    -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
    -import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
     import org.apache.flink.runtime.testingUtils.TestingUtils;
     import org.apache.flink.streaming.api.datastream.DataStream;
     import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    +import org.apache.flink.test.util.MiniClusterResource;
    +import org.apache.flink.testutils.category.Flip6;
     import org.apache.flink.util.Collector;
     import org.apache.flink.util.TestLogger;
     
    -import akka.actor.ActorRef;
    -import akka.actor.ActorSystem;
    -import akka.pattern.Patterns;
    -import akka.testkit.JavaTestKit;
    -import akka.util.Timeout;
    -import org.junit.After;
     import org.junit.Before;
    +import org.junit.ClassRule;
     import org.junit.Test;
    +import org.junit.experimental.categories.Category;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
     import java.io.IOException;
     import java.util.ArrayList;
     import java.util.List;
     import java.util.Map;
    -import java.util.concurrent.TimeUnit;
     
    -import scala.concurrent.Await;
    -import scala.concurrent.Future;
    -import scala.concurrent.duration.FiniteDuration;
    -
    -import static org.junit.Assert.fail;
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.assertTrue;
     
     /**
    - * Tests the availability of accumulator results during runtime. The test 
case tests a user-defined
    - * accumulator and Flink's internal accumulators for two consecutive tasks.
    - *
    - * <p>CHAINED[Source -> Map] -> Sink
    - *
    - * <p>Checks are performed as the elements arrive at the operators. Checks 
consist of a message sent by
    - * the task to the task manager which notifies the job manager and sends 
the current accumulators.
    - * The task blocks until the test has been notified about the current 
accumulator values.
    - *
    - * <p>A barrier between the operators ensures that that pipelining is 
disabled for the streaming test.
    - * The batch job reads the records one at a time. The streaming code 
buffers the records beforehand;
    - * that's why exact guarantees about the number of records read are very 
hard to make. Thus, why we
    - * check for an upper bound of the elements read.
    + * Tests the availability of accumulator results during runtime.
      */
    +@Category(Flip6.class)
     public class AccumulatorLiveITCase extends TestLogger {
     
        private static final Logger LOG = 
LoggerFactory.getLogger(AccumulatorLiveITCase.class);
     
    -   private static ActorSystem system;
    -   private static ActorGateway jobManagerGateway;
    -   private static ActorRef taskManager;
    -
    -   private static JobID jobID;
    -   private static JobGraph jobGraph;
    -
        // name of user accumulator
        private static final String ACCUMULATOR_NAME = "test";
     
    +   private static final long HEARTBEAT_INTERVAL = 50L;
    +
        // number of heartbeat intervals to check
        private static final int NUM_ITERATIONS = 5;
     
    -   private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
    +   private static final List<Integer> inputData = new 
ArrayList<>(NUM_ITERATIONS);
     
    -   private static final FiniteDuration TIMEOUT = new FiniteDuration(10, 
TimeUnit.SECONDS);
    +   static {
    +           // generate test data
    +           for (int i = 0; i < NUM_ITERATIONS; i++) {
    +                   inputData.add(i);
    +           }
    +   }
     
    -   @Before
    -   public void before() throws Exception {
    -           system = AkkaUtils.createLocalActorSystem(new Configuration());
    +   @ClassRule
    +   public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new 
MiniClusterResource(
    +           new MiniClusterResource.MiniClusterResourceConfiguration(
    +                   getConfiguration(),
    +                   1,
    +                   1),
    +           true);
     
    +   private static Configuration getConfiguration() {
                Configuration config = new Configuration();
    -           config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
1);
    -           config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
                config.setString(AkkaOptions.ASK_TIMEOUT, 
TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
    -           TestingCluster testingCluster = new TestingCluster(config, 
false, true);
    -           testingCluster.start();
    -
    -           jobManagerGateway = 
testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
    -           taskManager = testingCluster.getTaskManagersAsJava().get(0);
    -
    -           // generate test data
    -           for (int i = 0; i < NUM_ITERATIONS; i++) {
    -                   inputData.add(i, String.valueOf(i + 1));
    -           }
    +           config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, 
HEARTBEAT_INTERVAL);
     
    -           NotifyingMapper.finished = false;
    +           return config;
        }
     
    -   @After
    -   public void after() throws Exception {
    -           JavaTestKit.shutdownActorSystem(system);
    -           inputData.clear();
    +   @Before
    +   public void resetLatches() throws InterruptedException {
    +           NotifyingMapper.reset();
        }
     
        @Test
        public void testBatch() throws Exception {
    -
    -           /** The program **/
    -           ExecutionEnvironment env = new BatchPlanExtractor();
    +           ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
     
    -           DataSet<String> input = env.fromCollection(inputData);
    +           DataSet<Integer> input = env.fromCollection(inputData);
                input
                                .flatMap(new NotifyingMapper())
    -                           .output(new NotifyingOutputFormat());
    -
    -           env.execute();
    +                           .output(new DummyOutputFormat());
     
                // Extract job graph and set job id for the task to notify of 
accumulator changes.
    -           jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
    -           jobID = jobGraph.getJobID();
    +           JobGraph jobGraph = getJobGraph(env.createProgramPlan());
     
    -           verifyResults();
    +           submitJobAndVerifyResults(jobGraph);
        }
     
        @Test
        public void testStreaming() throws Exception {
     
    -           StreamExecutionEnvironment env = new 
DummyStreamExecutionEnvironment();
    +           StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
                env.setParallelism(1);
     
    -           DataStream<String> input = env.fromCollection(inputData);
    +           DataStream<Integer> input = env.fromCollection(inputData);
                input
                                .flatMap(new NotifyingMapper())
    -                           .writeUsingOutputFormat(new 
NotifyingOutputFormat()).disableChaining();
    +                           .writeUsingOutputFormat(new 
DummyOutputFormat()).disableChaining();
     
    -           jobGraph = env.getStreamGraph().getJobGraph();
    -           jobID = jobGraph.getJobID();
    +           JobGraph jobGraph = env.getStreamGraph().getJobGraph();
     
    -           verifyResults();
    +           submitJobAndVerifyResults(jobGraph);
        }
     
    -   private static void verifyResults() {
    -           new JavaTestKit(system) {{
    -
    -                   ActorGateway selfGateway = new 
AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
    -
    -                   // register for accumulator changes
    -                   jobManagerGateway.tell(new 
TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), selfGateway);
    -                   expectMsgEquals(TIMEOUT, true);
    -
    -                   // submit job
    -
    -                   jobManagerGateway.tell(
    -                                   new JobManagerMessages.SubmitJob(
    -                                                   jobGraph,
    -                                                   
ListeningBehaviour.EXECUTION_RESULT),
    -                                   selfGateway);
    -                   expectMsgClass(TIMEOUT, 
JobManagerMessages.JobSubmitSuccess.class);
    -
    -                   TestingJobManagerMessages.UpdatedAccumulators msg = 
(TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
    -                   Map<String, Accumulator<?, ?>> userAccumulators = 
msg.userAccumulators();
    +   private static void submitJobAndVerifyResults(JobGraph jobGraph) throws 
Exception {
     
    -                   ExecutionAttemptID mapperTaskID = null;
    -
    -                   ExecutionAttemptID sinkTaskID = null;
    -
    -                   /* Check for accumulator values */
    -                   if (checkUserAccumulators(0, userAccumulators)) {
    -                           LOG.info("Passed initial check for map task.");
    -                   } else {
    -                           fail("Wrong accumulator results when map task 
begins execution.");
    -                   }
    -
    -                   int expectedAccVal = 0;
    -
    -                   /* for mapper task */
    -                   for (int i = 1; i <= NUM_ITERATIONS; i++) {
    -                           expectedAccVal += i;
    -
    -                           // receive message
    -                           msg = 
(TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
    -                           userAccumulators = msg.userAccumulators();
    -
    -                           LOG.info("{}", userAccumulators);
    -
    -                           if (checkUserAccumulators(expectedAccVal, 
userAccumulators)) {
    -                                   LOG.info("Passed round #" + i);
    -                           } else if 
(checkUserAccumulators(expectedAccVal, userAccumulators)) {
    -                                   // we determined the wrong task id and 
need to switch the two here
    -                                   ExecutionAttemptID temp = mapperTaskID;
    -                                   mapperTaskID = sinkTaskID;
    -                                   sinkTaskID = temp;
    -                                   LOG.info("Passed round #" + i);
    -                           } else {
    -                                   fail("Failed in round #" + i);
    -                           }
    -                   }
    -
    -                   msg = (TestingJobManagerMessages.UpdatedAccumulators) 
receiveOne(TIMEOUT);
    -                   userAccumulators = msg.userAccumulators();
    -
    -                   if (checkUserAccumulators(expectedAccVal, 
userAccumulators)) {
    -                           LOG.info("Passed initial check for sink task.");
    -                   } else {
    -                           fail("Wrong accumulator results when sink task 
begins execution.");
    -                   }
    +           ClusterClient<?> client = 
MINI_CLUSTER_RESOURCE.getClusterClient();
     
    -                   /* for sink task */
    -                   for (int i = 1; i <= NUM_ITERATIONS; i++) {
    +           client.setDetached(true);
    +           client.submitJob(jobGraph, 
AccumulatorLiveITCase.class.getClassLoader());
     
    -                           // receive message
    -                           msg = 
(TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
    +           try {
    +                   NotifyingMapper.notifyLatch.await();
    +                   Thread.sleep(HEARTBEAT_INTERVAL * 4); // wait for 
heartbeat
    --- End diff --
    
    Well....yeah we could use a deadline and polling, probably will be even 
faster.


> Port AccumulatorLiveITCase to flip6
> -----------------------------------
>
>                 Key: FLINK-8959
>                 URL: https://issues.apache.org/jira/browse/FLINK-8959
>             Project: Flink
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 1.5.0
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Blocker
>             Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to