[
https://issues.apache.org/jira/browse/BEAM-2871?focusedWorklogId=90002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90002
]
ASF GitHub Bot logged work on BEAM-2871:
----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Apr/18 15:36
Start Date: 11/Apr/18 15:36
Worklog Time Spent: 10m
Work Description: reuvenlax closed pull request #3961: [BEAM-2871] Add
advanced examples of running external libraries on workers
URL: https://github.com/apache/beam/pull/3961
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/Echo.cc
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/Echo.cc
new file mode 100644
index 00000000000..4ccb4dfef93
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/Echo.cc
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// 'Hello World!' program
+
+#include <iostream>
+#include <fstream>
+
+int main(int argc, char* argv[])
+{
+ if(argc < 3){
+ std::cerr << "No parameter sent, must send the return file location
and a statement to echo" << '\n';
+ return 1;
+ }
+ std::string retFile = argv[1];
+ std::string word = argv[2];
+ std::ofstream myfile;
+ myfile.open (retFile);
+ myfile << word;
+ myfile.close();
+ return 0;
+}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/EchoAgain.cc
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/EchoAgain.cc
new file mode 100644
index 00000000000..419bf405da3
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/EchoAgain.cc
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// 'Hello World!' program, just echos what was sent to it.
+
+#include <iostream>
+#include <fstream>
+
+int main(int argc, char* argv[])
+{
+ if(argc < 3){
+ std::cerr << "No parameter sent, must send the return file location
and a statement to echo" << '\n';
+ return 1;
+ }
+ std::string retFile = argv[1];
+ std::string word = argv[2];
+ std::ofstream myfile;
+ myfile.open (retFile);
+ myfile << "You again? Well ok, here is your word again." << word ;
+ myfile.close();
+ return 0;
+}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
new file mode 100644
index 00000000000..7afda9ef3a8
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess;
+
+import java.util.ArrayList;
+import java.util.List;
+import
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs;
+import
org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs.Command;
+import org.apache.beam.examples.subprocess.kernel.SubProcessKernel;
+import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * In this example batch pipeline we will invoke a simple Echo C++ library
within a DoFn The sample
+ * makes use of a ExternalLibraryDoFn class which abstracts the setup and
processing of the
+ * executable, logs and results.
+ * For this example we are using commands passed to the library based
+ * on ordinal position but for a production system you should use a mechanism
like ProtoBuffers with
+ * Base64 encoding to pass the parameters to the library
+ * To test this example you will need to build the files Echo.cc and
EchoAgain.cc in a
+ * linux env matching the runner that you are using (using g++ with static
option).
+ * Once built copy them to the SourcePath defined in
+ * {@link SubProcessPipelineOptions}
+ *
+ */
+public class ExampleEchoPipeline {
+ static final Logger LOG = LoggerFactory.getLogger(ExampleEchoPipeline.class);
+
+ public static void main(String[] args) throws Exception {
+
+ // Read in the options for the pipeline
+ SubProcessPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation()
+ .as(SubProcessPipelineOptions.class);
+
+ Pipeline p = Pipeline.create(options);
+
+ // Setup the Configuration option used with all transforms
+ SubProcessConfiguration configuration =
options.getSubProcessConfiguration();
+
+ // Create some sample data to be fed to our c++ Echo library
+ List<KV<String, String>> sampleData = new ArrayList<>();
+ for (int i = 0; i < 10000; i++) {
+ String str = String.valueOf(i);
+ sampleData.add(KV.of(str, str));
+ }
+
+ // Define the pipeline which is two transforms echoing the inputs out to
Logs
+ p.apply(Create.of(sampleData))
+ .apply("Echo inputs round 1", ParDo.of(
+ new EchoInputDoFn(configuration, "Echo")))
+ .apply("Echo inputs round 2", ParDo.of(
+ new EchoInputDoFn(configuration, "EchoAgain")));
+
+ p.run();
+ }
+
+ /**
+ * Simple DoFn that echos the element, used as an example of running a C++
library.
+ */
+ @SuppressWarnings("serial")
+ public static class EchoInputDoFn extends DoFn<KV<String, String>,
KV<String, String>> {
+
+ static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class);
+
+ private SubProcessConfiguration configuration;
+ private String binaryName;
+
+ public EchoInputDoFn(SubProcessConfiguration configuration, String binary)
{
+ // Pass in configuration information the name of the filename of the
sub-process and the level
+ // of concurrency
+ this.configuration = configuration;
+ this.binaryName = binary;
+ }
+
+ @Setup
+ public void setUp() throws Exception {
+ CallingSubProcessUtils.setUp(configuration, binaryName);
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c) throws Exception {
+ try {
+ // Our Library takes a single command in position 0 which it will echo
back in the result
+ SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
+ Command command = new Command(0,
String.valueOf(c.element().getValue()));
+ commands.putCommand(command);
+
+ // The ProcessingKernel deals with the execution of the process
+ SubProcessKernel kernel = new SubProcessKernel(configuration,
binaryName);
+
+ // Run the command and work through the results
+ List<String> results = kernel.exec(commands);
+ for (String s : results) {
+ c.output(KV.of(c.element().getKey(), s));
+ }
+ } catch (Exception ex) {
+ LOG.error("Error processing element ", ex);
+ throw ex;
+ }
+ }
+ }
+
+ private static String getTestShellEcho(){
+ return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;";
+ }
+
+ private static String getTestShellEchoAgain(){
+ return "#!/bin/sh\n" + "filename=$1;\n"
+ + "echo \"You again? Well ok, here is your word again.\" >> $2 >>
$filename;";
+ }
+
+}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java
new file mode 100644
index 00000000000..85bd4496d08
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess;
+
+
+import
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation.Required;
+
+/**
+ * Options for running a sub process within a DoFn.
+ */
+public interface SubProcessPipelineOptions extends PipelineOptions {
+
+ @Description("Source GCS directory where the C++ library is located
gs://bucket/tests")
+ @Required
+ String getSourcePath();
+
+ void setSourcePath(String sourcePath);
+
+ @Description("Working directory for the process I/O")
+ @Default.String("/tmp/grid_working_files")
+ String getWorkerPath();
+
+ void setWorkerPath(String workerPath);
+
+ @Description("The maximum time to wait for the sub-process to complete")
+ @Default.Integer(3600)
+ Integer getWaitTime();
+
+ void setWaitTime(Integer waitTime);
+
+ @Description("As sub-processes can be heavy weight define the level of
concurrency level")
+ @Required
+ Integer getConcurrency();
+
+ void setConcurrency(Integer concurrency);
+
+ @Description("Should log files only be uploaded if error.")
+ @Default.Boolean(true)
+ Boolean getOnlyUpLoadLogsOnError();
+
+ void setOnlyUpLoadLogsOnError(Boolean onlyUpLoadLogsOnError);
+
+ @Default.InstanceFactory(SubProcessConfigurationFactory.class)
+ SubProcessConfiguration getSubProcessConfiguration();
+
+ void setSubProcessConfiguration(SubProcessConfiguration configuration);
+
+ /**
+ * Confirm Configuration and return a configuration object used in pipeline.
+ */
+ class SubProcessConfigurationFactory
+ implements DefaultValueFactory<SubProcessConfiguration> {
+ @Override
+ public SubProcessConfiguration create(PipelineOptions options) {
+
+ SubProcessPipelineOptions subProcessPipelineOptions =
(SubProcessPipelineOptions) options;
+
+ SubProcessConfiguration configuration = new SubProcessConfiguration();
+
+ if (subProcessPipelineOptions.getSourcePath() == null) {
+ throw new IllegalStateException("Source path must be set");
+ }
+ if (subProcessPipelineOptions.getConcurrency() == null
+ || subProcessPipelineOptions.getConcurrency() == 0) {
+ throw new IllegalStateException("Concurrency must be set and be > 0");
+ }
+ configuration.setSourcePath(subProcessPipelineOptions.getSourcePath());
+ configuration.setWorkerPath(subProcessPipelineOptions.getWorkerPath());
+ configuration.setWaitTime(subProcessPipelineOptions.getWaitTime());
+
configuration.setOnlyUpLoadLogsOnError(subProcessPipelineOptions.getOnlyUpLoadLogsOnError());
+ configuration.concurrency = subProcessPipelineOptions.getConcurrency();
+
+ return configuration;
+
+ }
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java
new file mode 100644
index 00000000000..b55da9a3176
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.configuration;
+
+import java.io.Serializable;
+
+/**
+ * Configuration file used to setup the Process kernel for execution of the
external library Values
+ * are copied from the Options to all them to be Serializable.
+ */
+@SuppressWarnings("serial")
+public class SubProcessConfiguration implements Serializable {
+
+ // Source GCS directory where the C++ library is located gs://bucket/tests
+ public String sourcePath;
+
+ // Working directory for the process I/O
+ public String workerPath;
+
+ // The maximum time to wait for the sub-process to complete
+ public Integer waitTime;
+
+ // "As sub-processes can be heavy weight match the concurrency level to num
cores on the machines"
+ public Integer concurrency;
+
+ // Should log files only be uploaded if error
+ public Boolean onlyUpLoadLogsOnError;
+
+ public Boolean getOnlyUpLoadLogsOnError() {
+ return onlyUpLoadLogsOnError;
+ }
+
+ public void setOnlyUpLoadLogsOnError(Boolean onlyUpLoadLogsOnError) {
+ this.onlyUpLoadLogsOnError = onlyUpLoadLogsOnError;
+ }
+
+ public String getSourcePath() {
+ return sourcePath;
+ }
+
+ public void setSourcePath(String sourcePath) {
+ this.sourcePath = sourcePath;
+ }
+
+ public String getWorkerPath() {
+ return workerPath;
+ }
+
+ public void setWorkerPath(String workerPath) {
+ this.workerPath = workerPath;
+ }
+
+ public Integer getWaitTime() {
+ return waitTime;
+ }
+
+ public void setWaitTime(Integer waitTime) {
+ this.waitTime = waitTime;
+ }
+
+ public Integer getConcurrency() {
+ return concurrency;
+ }
+
+ public void setConcurrency(Integer concurrency) {
+ this.concurrency = concurrency;
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessCommandLineArgs.java
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessCommandLineArgs.java
new file mode 100644
index 00000000000..dd82ee91de9
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessCommandLineArgs.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.kernel;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+
+/**
+ * Parameters to the sub-process, has tuple of ordinal position and the value.
+ */
+public class SubProcessCommandLineArgs {
+
+ // Parameters to pass to the sub-process
+ private List<Command> parameters = Lists.newArrayList();
+
+ public void addCommand(Integer position, String value) {
+ parameters.add(new Command(position, value));
+ }
+
+ public void putCommand(Command command) {
+ parameters.add(command);
+ }
+
+ public List<Command> getParameters() {
+ return parameters;
+ }
+
+ /**
+ * Class used to store the SubProcces parameters.
+ */
+ public static class Command {
+
+ // The ordinal position of the command to pass to the sub-process
+ int ordinalPosition;
+ String value;
+
+ @SuppressWarnings("unused")
+ private Command(){}
+
+ public Command(int ordinalPosition, String value) {
+ this.ordinalPosition = ordinalPosition;
+ this.value = value;
+ }
+
+ public int getKey() {
+ return ordinalPosition;
+ }
+
+ public void setKey(int key) {
+ this.ordinalPosition = key;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ public void setValue(String value) {
+ this.value = value;
+ }
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessIOFiles.java
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessIOFiles.java
new file mode 100644
index 00000000000..82f8e3a7696
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessIOFiles.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.kernel;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+import
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.examples.subprocess.utils.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * All information generated from the process will be stored in output files.
The local working
+ * directory is used to generate three files with extension .err for standard
error output .out for
+ * standard out output .ret for storing the results from the called library.
The files will have a
+ * uuid created for them based on java.util.UUID
+ */
+public class SubProcessIOFiles implements Closeable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SubProcessIOFiles.class);
+
+ Path errFile;
+ Path outFile;
+ Path resultFile;
+ Path base;
+
+ String errFileLocation = "";
+ String outFileLocation = "";
+ String uuid;
+
+ public String getErrFileLocation() {
+ return errFileLocation;
+ }
+
+ public String getOutFileLocation() {
+ return outFileLocation;
+ }
+
+ /**
+ * @param workerWorkingDirectory
+ */
+ public SubProcessIOFiles(String workerWorkingDirectory) {
+
+ this.uuid = UUID.randomUUID().toString();
+ base = Paths.get(workerWorkingDirectory);
+
+ // Setup all the redirect handles, including the return file type
+ errFile = Paths.get(base.toString(), uuid + ".err");
+ outFile = Paths.get(base.toString(), uuid + ".out");
+ resultFile = Paths.get(base.toString(), uuid + ".res");
+
+ }
+
+ public Path getErrFile() {
+ return errFile;
+ }
+
+ public Path getOutFile() {
+ return outFile;
+ }
+
+ public Path getResultFile() {
+ return resultFile;
+ }
+
+ /**
+ * Clean up the files that have been created on the local worker file system.
+ * Without this expect both performance issues and eventual failure
+ */
+ @Override
+ public void close() throws IOException {
+
+ if (Files.exists(outFile)) {
+ Files.delete(outFile);
+ }
+
+ if (Files.exists(errFile)) {
+ Files.delete(errFile);
+ }
+
+ if (Files.exists(resultFile)) {
+ Files.delete(resultFile);
+ }
+ }
+
+ /**
+ * Will copy the output files to the GCS path setup via the configuration.
+ * @param configuration
+ * @param params
+ */
+ public void copyOutPutFilesToBucket(SubProcessConfiguration configuration,
String params) {
+ if (Files.exists(outFile) || Files.exists(errFile)) {
+ try {
+ outFileLocation = FileUtils.copyFileFromWorkerToGCS(configuration,
outFile);
+ } catch (Exception ex) {
+ LOG.error("Error uploading log file to storage ", ex);
+ }
+
+ try {
+ errFileLocation = FileUtils.copyFileFromWorkerToGCS(configuration,
errFile);
+ } catch (Exception ex) {
+ LOG.error("Error uploading log file to storage ", ex);
+ }
+
+ LOG.info(String.format("Log Files for process: %s outFile was: %s
errFile was: %s", params,
+ outFileLocation, errFileLocation));
+ } else {
+ LOG.error(String.format("There was no output file or err file for
process %s", params));
+ }
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessKernel.java
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessKernel.java
new file mode 100644
index 00000000000..e45bf5bf2ee
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessKernel.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.kernel;
+
+import java.io.IOException;
+import java.lang.ProcessBuilder.Redirect;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
+import org.apache.beam.examples.subprocess.utils.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the process kernel which deals with exec of the subprocess.
+ * It also deals with all I/O.
+ */
+public class SubProcessKernel {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SubProcessKernel.class);
+
+ private static final int MAX_SIZE_COMMAND_LINE_ARGS = 128 * 1024;
+
+ SubProcessConfiguration configuration;
+ ProcessBuilder processBuilder;
+
+ private SubProcessKernel() {}
+
+ /**
+ * Creates the SubProcess Kernel ready for execution.
+ * Will deal with all input and outputs to the SubProcess
+ * @param options
+ * @param binaryName
+ */
+ public SubProcessKernel(SubProcessConfiguration options, String binaryName) {
+ this.configuration = options;
+ this.processBuilder = new ProcessBuilder(binaryName);
+ }
+
+ public List<String> exec(SubProcessCommandLineArgs commands) throws
Exception {
+ try (CallingSubProcessUtils.Permit permit =
+ new CallingSubProcessUtils.Permit(processBuilder.command().get(0))) {
+
+ List<String> results = null;
+
+ try (SubProcessIOFiles outputFiles = new
SubProcessIOFiles(configuration.getWorkerPath())) {
+
+ try {
+ Process process = execBinary(processBuilder, commands, outputFiles);
+ results = collectProcessResults(process, processBuilder,
outputFiles);
+ } catch (Exception ex) {
+ LOG.error("Error running executable ", ex);
+ throw (ex);
+ }
+ } catch (IOException ex) {
+ LOG.error(
+ "Unable to delete the outputfiles. This can lead to performance
issues and failure",
+ ex);
+ }
+ return results;
+ }
+ }
+
+ public byte[] execBinaryResult(SubProcessCommandLineArgs commands) throws
Exception {
+ try (CallingSubProcessUtils.Permit permit =
+ new CallingSubProcessUtils.Permit(processBuilder.command().get(0))) {
+
+
+ try (SubProcessIOFiles outputFiles = new
SubProcessIOFiles(configuration.getWorkerPath())) {
+
+ try {
+ Process process = execBinary(processBuilder, commands, outputFiles);
+ return collectProcessResultsBytes(process, processBuilder,
outputFiles);
+ } catch (Exception ex) {
+ LOG.error("Error running executable ", ex);
+ throw (ex);
+ }
+ } catch (IOException ex) {
+ LOG.error(
+ "Unable to delete the outputfiles. This can lead to performance
issues and failure",
+ ex);
+ }
+ return new byte[0];
+ }
+ }
+
+ private ProcessBuilder prepareBuilder(ProcessBuilder builder,
SubProcessCommandLineArgs commands,
+ SubProcessIOFiles outPutFiles) throws IllegalStateException {
+
+ // Check we are not over the max size of command line parameters
+ if (getTotalCommandBytes(commands) > MAX_SIZE_COMMAND_LINE_ARGS) {
+ throw new IllegalStateException("Command is over 2MB in size");
+ }
+
+ appendExecutablePath(builder);
+
+ // Add the result file path to the builder at position 1, 0 is reserved
for the process itself
+ builder.command().add(1, outPutFiles.resultFile.toString());
+
+ // Shift commands by 2 ordinal positions and load into the builder
+ if (commands != null) {
+ for (SubProcessCommandLineArgs.Command s : commands.getParameters()) {
+ builder.command().add(s.ordinalPosition + 2, s.value);
+ }
+ }
+
+ builder.redirectError(Redirect.appendTo(outPutFiles.errFile.toFile()));
+ builder.redirectOutput(Redirect.appendTo(outPutFiles.outFile.toFile()));
+
+ return builder;
+ }
+
+ /**
+ * Add up the total bytes used by the process.
+ * @param commands
+ * @return
+ */
+ private int getTotalCommandBytes(SubProcessCommandLineArgs commands) {
+ int size = 0;
+ for (SubProcessCommandLineArgs.Command c : commands.getParameters()) {
+ size += c.value.length();
+ }
+ return size;
+ }
+
+ private Process execBinary(ProcessBuilder builder, SubProcessCommandLineArgs
commands,
+ SubProcessIOFiles outPutFiles) throws Exception {
+ try {
+
+ builder = prepareBuilder(builder, commands, outPutFiles);
+ Process process = builder.start();
+
+ boolean timeout = !process.waitFor(configuration.getWaitTime(),
TimeUnit.SECONDS);
+
+ if (timeout) {
+ String log = String.format(
+ "Timeout waiting to run process with parameters %s . "
+ + "Check to see if your timeout is long enough. Currently set
at %s.",
+ createLogEntryFromInputs(builder.command()),
configuration.getWaitTime());
+ throw new Exception(log);
+ }
+ return process;
+
+ } catch (Exception ex) {
+
+ LOG.error(String.format("Error running process with parameters %s error
was %s ",
+ createLogEntryFromInputs(builder.command()), ex.getMessage()));
+ throw new Exception(ex);
+
+ }
+ }
+
+ /**
+ * TODO clean up duplicate with byte[] version collectBinaryProcessResults.
+ * @param process
+ * @param builder
+ * @param outPutFiles
+ * @return List of results
+ * @throws Exception if process has non 0 value or no logs found then throw
exception
+ */
+ private List<String> collectProcessResults(Process process, ProcessBuilder
builder,
+ SubProcessIOFiles outPutFiles) throws Exception {
+
+ List<String> results = new ArrayList<>();
+
+ try {
+
+ LOG.debug(String.format("Executing process %s",
createLogEntryFromInputs(builder.command())));
+
+ // If process exit value is not 0 then subprocess failed, record logs
+ if (process.exitValue() != 0) {
+ outPutFiles.copyOutPutFilesToBucket(configuration,
FileUtils.toStringParams(builder));
+ String log = createLogEntryForProcessFailure(process,
builder.command(), outPutFiles);
+ throw new Exception(log);
+ }
+
+ // If no return file then either something went wrong or the binary is
setup incorrectly for
+ // the ret file either way throw error
+ if (!Files.exists(outPutFiles.resultFile)) {
+ String log = createLogEntryForProcessFailure(process,
builder.command(), outPutFiles);
+ outPutFiles.copyOutPutFilesToBucket(configuration,
FileUtils.toStringParams(builder));
+ throw new Exception(log);
+ }
+
+ // Everything looks healthy return values
+ try (Stream<String> lines = Files.lines(outPutFiles.resultFile)) {
+ for (String line : (Iterable<String>) lines::iterator) {
+ results.add(line);
+ }
+ }
+ return results;
+ } catch (Exception ex) {
+ String log = String.format("Unexpected error runnng process. %s error
message was %s",
+ createLogEntryFromInputs(builder.command()), ex.getMessage());
+ throw new Exception(log);
+ }
+ }
+
+ /**
+ * Used when the reault file contains binary data.
+ * @param process
+ * @param builder
+ * @param outPutFiles
+ * @return Binary results
+ * @throws Exception if process has non 0 value or no logs found then throw
exception
+ */
+ private byte[] collectProcessResultsBytes(Process process, ProcessBuilder
builder,
+ SubProcessIOFiles outPutFiles) throws Exception {
+
+ Byte[] results;
+
+ try {
+
+ LOG.debug(String.format("Executing process %s",
createLogEntryFromInputs(builder.command())));
+
+ // If process exit value is not 0 then subprocess failed, record logs
+ if (process.exitValue() != 0) {
+ outPutFiles.copyOutPutFilesToBucket(configuration,
FileUtils.toStringParams(builder));
+ String log = createLogEntryForProcessFailure(process,
builder.command(), outPutFiles);
+ throw new Exception(log);
+ }
+
+ // If no return file then either something went wrong or the binary is
setup incorrectly for
+ // the ret file either way throw error
+ if (!Files.exists(outPutFiles.resultFile)) {
+ String log = createLogEntryForProcessFailure(process,
builder.command(), outPutFiles);
+ outPutFiles.copyOutPutFilesToBucket(configuration,
FileUtils.toStringParams(builder));
+ throw new Exception(log);
+ }
+
+ // Everything looks healthy return bytes
+ return Files.readAllBytes(outPutFiles.resultFile);
+
+ } catch (Exception ex) {
+ String log = String.format("Unexpected error runnng process. %s error
message was %s",
+ createLogEntryFromInputs(builder.command()), ex.getMessage());
+ throw new Exception(log);
+ }
+ }
+
+ private static String createLogEntryForProcessFailure(Process process,
List<String> commands,
+ SubProcessIOFiles files) {
+
+ StringBuilder stringBuilder = new StringBuilder();
+
+ // Highlight when no result file is found vs standard process error
+ if (process.exitValue() == 0) {
+ stringBuilder.append(
+ String.format("%nProcess succeded but no result file was found %n"));
+ } else {
+ stringBuilder.append(
+ String.format("%nProcess error failed with exit value of %s %n",
process.exitValue()));
+ }
+
+ stringBuilder
+ .append(String.format("Command info was %s %n",
createLogEntryFromInputs(commands)));
+
+ stringBuilder.append(String.format("First line of error file is %s %n",
+ FileUtils.readLineOfLogFile(files.errFile)));
+
+ stringBuilder.append(String.format("First line of out file is %s %n",
+ FileUtils.readLineOfLogFile(files.outFile)));
+
+ stringBuilder.append(String.format("First line of ret file is %s %n",
+ FileUtils.readLineOfLogFile(files.resultFile)));
+
+ return stringBuilder.toString();
+
+ }
+
+ private static String createLogEntryFromInputs(List<String> commands) {
+ String params;
+ if (commands != null) {
+ params = String.join(",", commands);
+ } else {
+ params = "No-Commands";
+ }
+ return params;
+ }
+
+ // Pass the Path of the binary to the SubProcess in Command position 0
+ private ProcessBuilder appendExecutablePath(ProcessBuilder builder) {
+ String executable = builder.command().get(0);
+ if (executable == null) {
+ throw new IllegalArgumentException(
+ "No executable provided to the Process Builder... we will do...
nothing... ");
+ }
+ builder.command().set(0,
+ FileUtils.getFileResourceId(configuration.getWorkerPath(),
executable).toString());
+ return builder;
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/CallingSubProcessUtils.java
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/CallingSubProcessUtils.java
new file mode 100644
index 00000000000..49e3ab7839b
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/CallingSubProcessUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.utils;
+
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for dealing with concurrency and binary file copies to the
worker.
+ */
+public class CallingSubProcessUtils {
+
+ // Prevent Instantiation
+ private CallingSubProcessUtils() {}
+
+ static final Logger LOG =
LoggerFactory.getLogger(CallingSubProcessUtils.class);
+
+ static boolean initCompleted = false;
+
+ // Allow multiple subclasses to create files, but only one thread per
subclass can add the file to
+ // the worker
+ private static final Set<String> downloadedFiles =
Sets.<String>newConcurrentHashSet();
+
+ // Limit the number of threads able to do work
+ private static Map<String, Semaphore> semaphores = new
ConcurrentHashMap<String, Semaphore>();
+
+ public static void setUp(SubProcessConfiguration configuration, String
binaryName)
+ throws Exception {
+
+ if (!semaphores.containsKey(binaryName)) {
+ initSemaphore(configuration.getConcurrency(), binaryName);
+ }
+
+ synchronized (downloadedFiles) {
+ if (!downloadedFiles.contains(binaryName)) {
+ // Create Directories if needed
+ FileUtils.createDirectoriesOnWorker(configuration);
+ LOG.info("Calling filesetup to move Executables to worker.");
+ ExecutableFile executableFile = new ExecutableFile(configuration,
binaryName);
+ FileUtils.copyFileFromGCSToWorker(executableFile);
+ downloadedFiles.add(binaryName);
+ }
+ }
+ }
+
+ public static synchronized void initSemaphore(Integer permits, String
binaryName) {
+ if (!semaphores.containsKey(binaryName)) {
+ LOG.info(String.format(String.format("Initialized Semaphore for binary
%s ", binaryName)));
+ semaphores.put(binaryName, new Semaphore(permits));
+ }
+ }
+
+ private static void aquireSemaphore(String binaryName) throws
IllegalStateException {
+ if (!semaphores.containsKey(binaryName)) {
+ throw new IllegalStateException("Semaphore is NULL, check init logic in
@Setup.");
+ }
+ try {
+ semaphores.get(binaryName).acquire();
+ } catch (InterruptedException ex) {
+ LOG.error("Interupted during aquire", ex);
+ }
+ }
+
+ private static void releaseSemaphore(String binaryName) throws
IllegalStateException {
+ if (!semaphores.containsKey(binaryName)) {
+ throw new IllegalStateException("Semaphore is NULL, check init logic in
@Setup.");
+ }
+ semaphores.get(binaryName).release();
+ }
+
+ /**
+ * Permit class for access to worker cpu resources.
+ */
+ public static class Permit implements AutoCloseable {
+
+ private String binaryName;
+
+ public Permit(String binaryName){
+ this.binaryName = binaryName;
+ CallingSubProcessUtils.aquireSemaphore(binaryName);
+ }
+
+ @Override
+ public void close() {
+ CallingSubProcessUtils.releaseSemaphore(binaryName);
+ }
+
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/ExecutableFile.java
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/ExecutableFile.java
new file mode 100644
index 00000000000..a5a789250cb
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/ExecutableFile.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.utils;
+
+import
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains the configuration for the external library.
+ */
+@DefaultCoder(AvroCoder.class)
+public class ExecutableFile {
+
+ String fileName;
+
+ private String sourceGCSLocation;
+ private String destinationLocation;
+
+ static final Logger LOG = LoggerFactory.getLogger(ExecutableFile.class);
+
+ public String getSourceGCSLocation() {
+ return sourceGCSLocation;
+ }
+
+ public void setSourceGCSLocation(String sourceGCSLocation) {
+ this.sourceGCSLocation = sourceGCSLocation;
+ }
+
+ public String getDestinationLocation() {
+ return destinationLocation;
+ }
+
+ public void setDestinationLocation(String destinationLocation) {
+ this.destinationLocation = destinationLocation;
+ }
+
+ public ExecutableFile(SubProcessConfiguration configuration, String fileName)
+ throws IllegalStateException {
+ if (configuration == null) {
+ throw new IllegalStateException("Configuration can not be NULL");
+ }
+ if (fileName == null) {
+ throw new IllegalStateException("FileName can not be NULLt");
+ }
+ this.fileName = fileName;
+ setDestinationLocation(configuration);
+ setSourceLocation(configuration);
+ }
+
+ private void setDestinationLocation(SubProcessConfiguration configuration) {
+ this.sourceGCSLocation =
+ FileUtils.getFileResourceId(configuration.getSourcePath(),
fileName).toString();
+ }
+
+ private void setSourceLocation(SubProcessConfiguration configuration) {
+ this.destinationLocation =
+ FileUtils.getFileResourceId(configuration.getWorkerPath(),
fileName).toString();
+ }
+}
diff --git
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/FileUtils.java
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/FileUtils.java
new file mode 100644
index 00000000000..07e118b938d
--- /dev/null
+++
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/FileUtils.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.utils;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for dealing with movement of files from object stores and workers.
+ */
+public class FileUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
+
+ public static ResourceId getFileResourceId(String directory, String
fileName) {
+ ResourceId resourceID = FileSystems.matchNewResource(directory, true);
+ return resourceID.getCurrentDirectory().resolve(fileName,
StandardResolveOptions.RESOLVE_FILE);
+ }
+
+ public static String toStringParams(ProcessBuilder builder) {
+ return (String.join(",", builder.command()));
+ }
+
+ public static String copyFileFromWorkerToGCS(SubProcessConfiguration
configuration,
+ Path fileToUpload) throws Exception {
+
+ Path fileName;
+
+ if ((fileName = fileToUpload.getFileName()) == null) {
+ throw new IllegalArgumentException("FileName can not be null.");
+ }
+
+ ResourceId sourceFile = getFileResourceId(configuration.getWorkerPath(),
fileName.toString());
+
+ LOG.info("Copying file from worker " + sourceFile);
+
+ ResourceId destinationFile =
+ getFileResourceId(configuration.getSourcePath(), fileName.toString());
+ // TODO currently not supported with different schemas for example GCS to
local, else could use
+ // FileSystems.copy(ImmutableList.of(sourceFile),
ImmutableList.of(destinationFile));
+ try {
+ return copyFile(sourceFile, destinationFile);
+ } catch (Exception ex) {
+ LOG.error(String.format("Error copying file from %s to %s", sourceFile,
destinationFile),
+ ex);
+ throw ex;
+ }
+ }
+
+ public static String copyFileFromGCSToWorker(ExecutableFile execuableFile)
throws Exception {
+
+ ResourceId sourceFile =
+ FileSystems.matchNewResource(execuableFile.getSourceGCSLocation(),
false);
+ ResourceId destinationFile =
+ FileSystems.matchNewResource(execuableFile.getDestinationLocation(),
false);
+ try {
+ LOG.info(String.format("Moving File %s to %s ",
execuableFile.getSourceGCSLocation(),
+ execuableFile.getDestinationLocation()));
+ Path path = Paths.get(execuableFile.getDestinationLocation());
+
+ if (path.toFile().exists()) {
+ LOG.warn(String.format(
+ "Overwriting file %s, should only see this once per worker.",
+ execuableFile.getDestinationLocation()));
+ }
+ copyFile(sourceFile, destinationFile);
+ path.toFile().setExecutable(true);
+ return path.toString();
+
+ } catch (Exception ex) {
+ LOG.error(String.format("Error moving file : %s ",
execuableFile.fileName), ex);
+ throw ex;
+ }
+ }
+
+ public static String copyFile(ResourceId sourceFile, ResourceId
destinationFile)
+ throws IOException {
+
+ try (WritableByteChannel writeChannel =
FileSystems.create(destinationFile, "text/plain")) {
+ try (ReadableByteChannel readChannel = FileSystems.open(sourceFile)) {
+
+ final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
+ while (readChannel.read(buffer) != -1) {
+ buffer.flip();
+ writeChannel.write(buffer);
+ buffer.compact();
+ }
+ buffer.flip();
+ while (buffer.hasRemaining()) {
+ writeChannel.write(buffer);
+ }
+ }
+ }
+
+ return destinationFile.toString();
+ }
+
+ /**
+ * Create directories needed based on configuration.
+ * @param configuration
+ * @throws IOException
+ */
+ public static void createDirectoriesOnWorker(SubProcessConfiguration
configuration)
+ throws IOException {
+
+ try {
+
+ Path path = Paths.get(configuration.getWorkerPath());
+
+ if (!path.toFile().exists()) {
+ Files.createDirectories(path);
+ LOG.info(String.format("Created Folder %s ", path.toFile()));
+ }
+ } catch (FileAlreadyExistsException ex) {
+ LOG.warn(String.format(
+ " Tried to create folder %s which already existsed, this should not
happen!",
+ configuration.getWorkerPath()), ex);
+ }
+ }
+
+ public static String readLineOfLogFile(Path path) {
+
+ try (BufferedReader br = new BufferedReader(new
FileReader(path.toString()))) {
+ return br.readLine();
+ } catch (FileNotFoundException e) {
+ LOG.error("Error reading the first line of file", e);
+ } catch (IOException e) {
+ LOG.error("Error reading the first line of file", e);
+ }
+
+ // `return empty string rather than NULL string as this data is often used
in further logging
+ return "";
+ }
+}
diff --git
a/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
new file mode 100644
index 00000000000..4a9bec7380a
--- /dev/null
+++
b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs;
+import
org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs.Command;
+import org.apache.beam.examples.subprocess.kernel.SubProcessKernel;
+import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * To keep {@link org.apache.beam.examples.subprocess.ExampleEchoPipeline}
simple,
+ * it is not factored or testable. This test file should be maintained with a
copy of its
+ * code for a basic smoke test.
+ **/
+public class ExampleEchoPipelineTest {
+
+ static final Logger LOG =
LoggerFactory.getLogger(ExampleEchoPipelineTest.class);
+
+ @Rule public TestPipeline p =
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+ @Test public void testExampleEchoPipeline() throws Exception {
+
+ // Create two Bash files as tests for the binary files
+
+ Path fileA = Files.createTempFile("test-Echo", ".sh");
+ Path fileB = Files.createTempFile("test-EchoAgain", ".sh");
+
+ Path workerTempFiles = Files.createTempDirectory("test-Echoo");
+
+ try (SeekableByteChannel channel =
+ FileChannel.open(fileA, StandardOpenOption.CREATE,
StandardOpenOption.WRITE)) {
+ channel.write(ByteBuffer.wrap(getTestShellEcho().getBytes()));
+ }
+
+ try (SeekableByteChannel channel =
+ FileChannel.open(fileB, StandardOpenOption.CREATE,
StandardOpenOption.WRITE)) {
+ channel.write(ByteBuffer.wrap(getTestShellEchoAgain().getBytes()));
+ }
+
+
+ // Read in the options for the pipeline
+ SubProcessPipelineOptions options = PipelineOptionsFactory
+ .as(SubProcessPipelineOptions.class);
+
+ options.setConcurrency(2);
+ options.setSourcePath(fileA.getParent().toString());
+ options.setWorkerPath(workerTempFiles.toAbsolutePath().toString());
+
+ p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
+
+ // Setup the Configuration option used with all transforms
+ SubProcessConfiguration configuration =
options.getSubProcessConfiguration();
+
+ // Create some sample data to be fed to our c++ Echo library
+ List<KV<String, String>> sampleData = new ArrayList<>();
+
+ for (int i = 0; i < 100; i++) {
+ String str = String.valueOf(i);
+ sampleData.add(KV.of(str, str));
+ }
+
+ // Define the pipeline which is two transforms echoing the inputs out to
Logs
+ // For this use case we will make use of two shell files instead of the
binary to make
+ // testing easier
+ PCollection<KV<String, String>> output = p.apply(Create.of(sampleData))
+ .apply("Echo inputs round 1",
+ ParDo.of(new EchoInputDoFn(configuration,
fileA.getFileName().toString())))
+ .apply("Echo inputs round 2",
+ ParDo.of(new EchoInputDoFn(configuration,
fileB.getFileName().toString())));
+
+ PAssert.that(output).containsInAnyOrder(sampleData);
+
+ p.run();
+
+
+ }
+
+ /**
+ * Simple DoFn that echos the element, used as an example of running a C++
library.
+ */
+ @SuppressWarnings("serial") @RunWith(JUnit4.class) public static class
EchoInputDoFn
+ extends DoFn<KV<String, String>, KV<String, String>> {
+
+ static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class);
+
+ private SubProcessConfiguration configuration;
+ private String binaryName;
+
+ public EchoInputDoFn(SubProcessConfiguration configuration, String binary)
{
+ // Pass in configuration information the name of the filename of the
sub-process and the level
+ // of concurrency
+ this.configuration = configuration;
+ this.binaryName = binary;
+ }
+
+ @Setup public void setUp() throws Exception {
+ CallingSubProcessUtils.setUp(configuration, binaryName);
+ }
+
+ @ProcessElement public void processElement(ProcessContext c) throws
Exception {
+ try {
+ // Our Library takes a single command in position 0 which it will echo
back in the result
+ SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
+ Command command = new Command(0,
String.valueOf(c.element().getValue()));
+ commands.putCommand(command);
+
+ // The ProcessingKernel deals with the execution of the process
+ SubProcessKernel kernel = new SubProcessKernel(configuration,
binaryName);
+
+ // Run the command and work through the results
+ List<String> results = kernel.exec(commands);
+ for (String s : results) {
+ c.output(KV.of(c.element().getKey(), s));
+ }
+ } catch (Exception ex) {
+ LOG.error("Error processing element ", ex);
+ throw ex;
+ }
+ }
+ }
+
+ private static String getTestShellEcho(){
+ return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;";
+ }
+
+ private static String getTestShellEchoAgain(){
+ return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;";
+ }
+
+ private GcsUtil buildMockGcsUtil() throws IOException {
+ GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
+
+ // Any request to open gets a new bogus channel
+ Mockito.when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
+ .then(new Answer<SeekableByteChannel>() {
+
+ @Override public SeekableByteChannel answer(InvocationOnMock
invocation)
+ throws Throwable {
+ return FileChannel.open(Files.createTempFile("channel-", ".tmp"),
+ StandardOpenOption.CREATE,
+ StandardOpenOption.DELETE_ON_CLOSE);
+ }
+ });
+
+ // Any request for expansion returns a list containing the original GcsPath
+ // This is required to pass validation that occurs in TextIO during apply()
+ Mockito.when(mockGcsUtil.expand(Mockito.any(GcsPath.class))).then(new
Answer<List<GcsPath>>() {
+
+ @Override public List<GcsPath> answer(InvocationOnMock invocation)
throws Throwable {
+ return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+ }
+ });
+
+ return mockGcsUtil;
+ }
+
+ }
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 90002)
Time Spent: 40m (was: 0.5h)
> Add examples of running external libraries on workers
> -----------------------------------------------------
>
> Key: BEAM-2871
> URL: https://issues.apache.org/jira/browse/BEAM-2871
> Project: Beam
> Issue Type: New Feature
> Components: examples-java
> Affects Versions: 2.1.0
> Reporter: Reza ardeshir rokni
> Assignee: Reuven Lax
> Priority: Major
> Time Spent: 40m
> Remaining Estimate: 0h
>
> Provide example for how to run external libraries such as c++ code within a
> DoFn.
> More details of this pattern can be viewed in the blog:
> https://cloud.google.com/blog/big-data/2017/07/running-external-libraries-with-cloud-dataflow-for-grid-computing-workloads
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)