Repository: beam Updated Branches: refs/heads/master b21bdf475 -> 34b4a6d9d
BEAM-980 Support configuration of Apex DAG through properties file. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/31c63cb8 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/31c63cb8 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/31c63cb8 Branch: refs/heads/master Commit: 31c63cb8c14ea71ed45376d19b4fd9f285d80763 Parents: 1c6e667 Author: Thomas Weise <[email protected]> Authored: Wed Jan 25 22:22:36 2017 -0800 Committer: Thomas Weise <[email protected]> Committed: Thu Jan 26 22:54:00 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/apex/ApexPipelineOptions.java | 7 +- .../apache/beam/runners/apex/ApexRunner.java | 43 ++++++++--- .../beam/runners/apex/ApexYarnLauncher.java | 23 +++++- .../beam/runners/apex/ApexRunnerTest.java | 75 ++++++++++++++++++++ .../beam/runners/apex/ApexYarnLauncherTest.java | 9 ++- .../test/resources/beam-runners-apex.properties | 20 ++++++ 6 files changed, 161 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java index 54fdf76..f37e874 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java @@ -56,5 +56,10 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab @Default.Long(0) long getRunMillis(); -} + @Description("configuration properties file for the Apex engine") + void setConfigFile(String name); + + @Default.String("classpath:/beam-runners-apex.properties") + String getConfigFile(); +} http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java index f12ebef..e220e6c 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java @@ -22,10 +22,16 @@ import com.datatorrent.api.Context.DAGContext; import com.datatorrent.api.DAG; import com.datatorrent.api.StreamingApplication; import com.google.common.base.Throwables; + +import java.io.File; import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.net.URISyntaxException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; import org.apache.apex.api.EmbeddedAppLauncher; import org.apache.apex.api.Launcher; @@ -64,6 +70,7 @@ import org.apache.hadoop.conf.Configuration; public class ApexRunner extends PipelineRunner<ApexRunnerResult> { private final ApexPipelineOptions options; + public static final String CLASSPATH_SCHEME = "classpath"; /** * TODO: this isn't thread safe and may cause issues when tests run in parallel @@ -126,6 +133,31 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } }; + Properties configProperties = new Properties(); + try { + if (options.getConfigFile() != null) { + URI configURL = new URI(options.getConfigFile()); + if (CLASSPATH_SCHEME.equals(configURL.getScheme())) { + InputStream is = this.getClass().getResourceAsStream(configURL.getPath()); + if (is != null) { + configProperties.load(is); + is.close(); + } + } else { + if (!configURL.isAbsolute()) { + // resolve as local file name + File f = new File(options.getConfigFile()); + configURL = f.toURI(); + } + try (InputStream is = configURL.toURL().openStream()) { + configProperties.load(is); + } + } + } + } catch (IOException | URISyntaxException ex) { + throw new RuntimeException("Error loading properties", ex); + } + if (options.isEmbeddedExecution()) { Launcher<AppHandle> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED); Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); @@ -135,6 +167,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { launchAttributes.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false); } Configuration conf = new Configuration(false); + ApexYarnLauncher.addProperties(conf, configProperties); try { ApexRunner.ASSERTION_ERROR.set(null); AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes); @@ -146,7 +179,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } else { try { ApexYarnLauncher yarnLauncher = new ApexYarnLauncher(); - AppHandle apexAppResult = yarnLauncher.launchApp(apexApp); + AppHandle apexAppResult = yarnLauncher.launchApp(apexApp, configProperties); return new ApexRunnerResult(apexDAG.get(), apexAppResult); } catch (IOException e) { throw new RuntimeException("Failed to launch the application on YARN.", e); @@ -155,14 +188,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> { } - private static class IdentityFn<T> extends DoFn<T, T> { - private static final long serialVersionUID = 1L; - @ProcessElement - public void processElement(ProcessContext c) { - c.output(c.element()); - } - } - //////////////////////////////////////////// // Adapted from FlinkRunner for View support http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java index a2d88f4..6bc42f0 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexYarnLauncher.java @@ -52,6 +52,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.Set; import java.util.jar.JarFile; import java.util.jar.Manifest; @@ -80,7 +81,8 @@ import org.slf4j.LoggerFactory; public class ApexYarnLauncher { private static final Logger LOG = LoggerFactory.getLogger(ApexYarnLauncher.class); - public AppHandle launchApp(StreamingApplication app) throws IOException { + public AppHandle launchApp(StreamingApplication app, Properties configProperties) + throws IOException { List<File> jarsToShip = getYarnDeployDependencies(); StringBuilder classpath = new StringBuilder(); @@ -103,7 +105,7 @@ public class ApexYarnLauncher { Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); launchAttributes.put(YarnAppLauncher.LIB_JARS, classpath.toString().replace(':', ',')); - LaunchParams lp = new LaunchParams(dag, launchAttributes); + LaunchParams lp = new LaunchParams(dag, launchAttributes, configProperties); lp.cmd = "hadoop " + ApexYarnLauncher.class.getName(); HashMap<String, String> env = new HashMap<>(); env.put("HADOOP_USER_CLASSPATH_FIRST", "1"); @@ -292,6 +294,18 @@ public class ApexYarnLauncher { } /** + * Transfer the properties to the configuration object. + * @param conf + * @param props + */ + public static void addProperties(Configuration conf, Properties props) { + for (final String propertyName : props.stringPropertyNames()) { + String propertyValue = props.getProperty(propertyName); + conf.set(propertyName, propertyValue); + } + } + + /** * The main method expects the serialized DAG and will launch the YARN application. * @param args location of launch parameters * @throws IOException when parameters cannot be read @@ -309,6 +323,7 @@ public class ApexYarnLauncher { } }; Configuration conf = new Configuration(); // configuration from Hadoop client + addProperties(conf, params.configProperties); AppHandle appHandle = params.getApexLauncher().launchApp(apexApp, conf, params.launchAttributes); if (appHandle == null) { @@ -327,12 +342,14 @@ public class ApexYarnLauncher { private static final long serialVersionUID = 1L; private final DAG dag; private final Attribute.AttributeMap launchAttributes; + private final Properties configProperties; private HashMap<String, String> env; private String cmd; - protected LaunchParams(DAG dag, AttributeMap launchAttributes) { + protected LaunchParams(DAG dag, AttributeMap launchAttributes, Properties configProperties) { this.dag = dag; this.launchAttributes = launchAttributes; + this.configProperties = configProperties; } protected Launcher<?> getApexLauncher() { http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java new file mode 100644 index 0000000..436c959 --- /dev/null +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.apex; + +import com.datatorrent.api.DAG; +import com.datatorrent.api.DAG.OperatorMeta; +import com.datatorrent.stram.engine.OperatorContext; +import java.io.File; +import java.io.FileOutputStream; +import java.util.Collections; +import java.util.Properties; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.junit.Assert; +import org.junit.Test; + +/** + * Tests for the Apex runner. + */ +public class ApexRunnerTest { + + @Test + public void testConfigProperties() throws Exception { + + String operName = "testProperties"; + ApexPipelineOptions options = PipelineOptionsFactory.create() + .as(ApexPipelineOptions.class); + options.setRunner(ApexRunner.class); + + // default configuration from class path + Pipeline p = Pipeline.create(options); + p.apply(operName, Create.of(Collections.emptyList())); + ApexRunnerResult result = (ApexRunnerResult) p.run(); + result.cancel(); + + DAG dag = result.getApexDAG(); + OperatorMeta t1Meta = dag.getOperatorMeta(operName); + Assert.assertNotNull(t1Meta); + Assert.assertEquals(new Integer(32), t1Meta.getValue(OperatorContext.MEMORY_MB)); + + File tmp = File.createTempFile("beam-runners-apex-", ".properties"); + tmp.deleteOnExit(); + Properties props = new Properties(); + props.setProperty("dt.operator." + operName + ".attr.MEMORY_MB", "64"); + try (FileOutputStream fos = new FileOutputStream(tmp)) { + props.store(fos, ""); + } + options.setConfigFile(tmp.getAbsolutePath()); + result = (ApexRunnerResult) p.run(); + result.cancel(); + tmp.delete(); + dag = result.getApexDAG(); + t1Meta = dag.getOperatorMeta(operName); + Assert.assertNotNull(t1Meta); + Assert.assertEquals(new Integer(64), t1Meta.getValue(OperatorContext.MEMORY_MB)); + + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java index 986818b..6ffb091 100644 --- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java +++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexYarnLauncherTest.java @@ -35,6 +35,7 @@ import java.nio.file.Files; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.jar.JarFile; import org.apache.apex.api.EmbeddedAppLauncher; @@ -78,15 +79,17 @@ public class ApexYarnLauncherTest { Configuration conf = new Configuration(false); DAG dag = embeddedLauncher.prepareDAG(app, conf); Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap(); + Properties configProperties = new Properties(); ApexYarnLauncher launcher = new ApexYarnLauncher(); - launcher.launchApp(new MockApexYarnLauncherParams(dag, launchAttributes)); + launcher.launchApp(new MockApexYarnLauncherParams(dag, launchAttributes, configProperties)); } private static class MockApexYarnLauncherParams extends ApexYarnLauncher.LaunchParams { private static final long serialVersionUID = 1L; - public MockApexYarnLauncherParams(DAG dag, AttributeMap launchAttributes) { - super(dag, launchAttributes); + public MockApexYarnLauncherParams(DAG dag, AttributeMap launchAttributes, + Properties properties) { + super(dag, launchAttributes, properties); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/31c63cb8/runners/apex/src/test/resources/beam-runners-apex.properties ---------------------------------------------------------------------- diff --git a/runners/apex/src/test/resources/beam-runners-apex.properties b/runners/apex/src/test/resources/beam-runners-apex.properties new file mode 100644 index 0000000..48f8b05 --- /dev/null +++ b/runners/apex/src/test/resources/beam-runners-apex.properties @@ -0,0 +1,20 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +# properties for unit test +dt.operator.testProperties.attr.MEMORY_MB=32
