[ 
https://issues.apache.org/jira/browse/FLINK-2373?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14740450#comment-14740450
 ] 

ASF GitHub Bot commented on FLINK-2373:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1066#discussion_r39253434
  
    --- Diff: 
flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/RemoteEnvironmentITCase.java
 ---
    @@ -0,0 +1,164 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.test.javaApiOperators;
    +
    +import org.apache.flink.api.common.functions.RichMapPartitionFunction;
    +import org.apache.flink.api.common.io.GenericInputFormat;
    +import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
    +import org.apache.flink.client.program.ProgramInvocationException;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.io.GenericInputSplit;
    +import org.apache.flink.test.util.ForkableFlinkMiniCluster;
    +import org.apache.flink.util.Collector;
    +import org.junit.AfterClass;
    +import org.junit.Assert;
    +import org.junit.BeforeClass;
    +import org.junit.Test;
    +
    +import java.io.IOException;
    +import java.util.ArrayList;
    +import java.util.List;
    +
    +import static org.junit.Assert.assertEquals;
    +import static org.junit.Assert.fail;
    +
    +@SuppressWarnings("serial")
    +public class RemoteEnvironmentITCase {
    +
    +   private static final int TM_SLOTS = 4;
    +
    +   private static final int NUM_TM = 1;
    +
    +   private static final int USER_DOP = 2;
    +
    +   private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    +
    +   private static final String VALID_STARTUP_TIMEOUT = "100 s";
    +
    +   private static ForkableFlinkMiniCluster cluster;
    +
    +   @BeforeClass
    +   public static void setupCluster() {
    +           try {
    +                   Configuration config = new Configuration();
    +                   
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TM);
    +                   
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, TM_SLOTS);
    +                   cluster = new ForkableFlinkMiniCluster(config, false);
    +                   cluster.start();
    +           }
    +           catch (Exception e) {
    +                   e.printStackTrace();
    +                   fail("Error starting test cluster: " + e.getMessage());
    +           }
    +   }
    +
    +   @AfterClass
    +   public static void tearDownCluster() {
    +           try {
    +                   cluster.stop();
    +           }
    +           catch (Throwable t) {
    +                   t.printStackTrace();
    +                   fail("Cluster shutdown caused an exception: " + 
t.getMessage());
    +           }
    +   }
    +
    +   /**
    +    * Ensure that that Akka configuration parameters can be set.
    +    */
    +   @Test(expected=IllegalArgumentException.class)
    +   public void testInvalidAkkaConfiguration() throws Throwable {
    +           Configuration config = new Configuration();
    +           config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
INVALID_STARTUP_TIMEOUT);
    +
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
    +                           cluster.hostname(),
    +                           cluster.getLeaderRPCPort(),
    +                           config
    +           );
    +           env.getConfig().disableSysoutLogging();
    +
    +           DataSet<String> result = env.createInput(new 
TestNonRichInputFormat());
    +           result.output(new LocalCollectionOutputFormat<String>(new 
ArrayList<String>()));
    +           try {
    +                   env.execute();
    +                   Assert.fail("Program should not run successfully, cause 
of invalid akka settings.");
    +           } catch (ProgramInvocationException ex) {
    +                   throw ex.getCause();
    +           }
    +   }
    +
    +   /**
    +    * Ensure that the program parallelism can be set even if the 
configuration is supplied.
    +    */
    +   @Test
    +   public void testUserSpecificParallelism() throws Exception {
    +           Configuration config = new Configuration();
    +           config.setString(ConfigConstants.AKKA_STARTUP_TIMEOUT, 
VALID_STARTUP_TIMEOUT);
    +
    +           final ExecutionEnvironment env = 
ExecutionEnvironment.createRemoteEnvironment(
    +                           cluster.hostname(),
    +                           cluster.getLeaderRPCPort(),
    +                           config
    +           );
    +           env.setParallelism(USER_DOP);
    +           env.getConfig().disableSysoutLogging();
    +
    +           DataSet<Integer> result = env.createInput(new 
ParallelismDependentInputFormat())
    +                           .rebalance()
    +                           .mapPartition(new 
RichMapPartitionFunction<Integer, Integer>() {
    +                                   @Override
    +                                   public void 
mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception 
{
    +                                           
out.collect(getRuntimeContext().getIndexOfThisSubtask());
    +                                   }
    +                           });
    +           List<Integer> resultCollection = new ArrayList<Integer>();
    +           result.output(new 
LocalCollectionOutputFormat<Integer>(resultCollection));
    --- End diff --
    
    Here you can also use `result.collect()` to obtain a list of integers.


> Add configuration parameter to createRemoteEnvironment method
> -------------------------------------------------------------
>
>                 Key: FLINK-2373
>                 URL: https://issues.apache.org/jira/browse/FLINK-2373
>             Project: Flink
>          Issue Type: Bug
>          Components: other
>            Reporter: Andreas Kunft
>            Priority: Minor
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> Currently there is no way to provide a custom configuration upon creation of 
> a remote environment (via ExecutionEnvironment.createRemoteEnvironment(...)).
> This leads to errors when the submitted job exceeds the default value for the 
> max. payload size in Akka, as we can not increase the configuration value 
> (akka.remote.OversizedPayloadException: Discarding oversized payload...)
> Providing an overloaded method with a configuration parameter for the remote 
> environment fixes that.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to