[ https://issues.apache.org/jira/browse/BEAM-2871?focusedWorklogId=90002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90002 ]
ASF GitHub Bot logged work on BEAM-2871: ---------------------------------------- Author: ASF GitHub Bot Created on: 11/Apr/18 15:36 Start Date: 11/Apr/18 15:36 Worklog Time Spent: 10m Work Description: reuvenlax closed pull request #3961: [BEAM-2871] Add advanced examples of running external libraries on workers URL: https://github.com/apache/beam/pull/3961 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/Echo.cc b/examples/java/src/main/java/org/apache/beam/examples/subprocess/Echo.cc new file mode 100644 index 00000000000..4ccb4dfef93 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/Echo.cc @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// 'Hello World!' program + +#include <iostream> +#include <fstream> + +int main(int argc, char* argv[]) +{ + if(argc < 3){ + std::cerr << "No parameter sent, must send the return file location and a statement to echo" << '\n'; + return 1; + } + std::string retFile = argv[1]; + std::string word = argv[2]; + std::ofstream myfile; + myfile.open (retFile); + myfile << word; + myfile.close(); + return 0; +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/EchoAgain.cc b/examples/java/src/main/java/org/apache/beam/examples/subprocess/EchoAgain.cc new file mode 100644 index 00000000000..419bf405da3 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/EchoAgain.cc @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +// 'Hello World!' program, just echos what was sent to it. + +#include <iostream> +#include <fstream> + +int main(int argc, char* argv[]) +{ + if(argc < 3){ + std::cerr << "No parameter sent, must send the return file location and a statement to echo" << '\n'; + return 1; + } + std::string retFile = argv[1]; + std::string word = argv[2]; + std::ofstream myfile; + myfile.open (retFile); + myfile << "You again? Well ok, here is your word again." << word ; + myfile.close(); + return 0; +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java new file mode 100644 index 00000000000..7afda9ef3a8 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.subprocess; + +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration; +import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs; +import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs.Command; +import org.apache.beam.examples.subprocess.kernel.SubProcessKernel; +import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * In this example batch pipeline we will invoke a simple Echo C++ library within a DoFn The sample + * makes use of a ExternalLibraryDoFn class which abstracts the setup and processing of the + * executable, logs and results. + * For this example we are using commands passed to the library based + * on ordinal position but for a production system you should use a mechanism like ProtoBuffers with + * Base64 encoding to pass the parameters to the library + * To test this example you will need to build the files Echo.cc and EchoAgain.cc in a + * linux env matching the runner that you are using (using g++ with static option). + * Once built copy them to the SourcePath defined in + * {@link SubProcessPipelineOptions} + * + */ +public class ExampleEchoPipeline { + static final Logger LOG = LoggerFactory.getLogger(ExampleEchoPipeline.class); + + public static void main(String[] args) throws Exception { + + // Read in the options for the pipeline + SubProcessPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() + .as(SubProcessPipelineOptions.class); + + Pipeline p = Pipeline.create(options); + + // Setup the Configuration option used with all transforms + SubProcessConfiguration configuration = options.getSubProcessConfiguration(); + + // Create some sample data to be fed to our c++ Echo library + List<KV<String, String>> sampleData = new ArrayList<>(); + for (int i = 0; i < 10000; i++) { + String str = String.valueOf(i); + sampleData.add(KV.of(str, str)); + } + + // Define the pipeline which is two transforms echoing the inputs out to Logs + p.apply(Create.of(sampleData)) + .apply("Echo inputs round 1", ParDo.of( + new EchoInputDoFn(configuration, "Echo"))) + .apply("Echo inputs round 2", ParDo.of( + new EchoInputDoFn(configuration, "EchoAgain"))); + + p.run(); + } + + /** + * Simple DoFn that echos the element, used as an example of running a C++ library. + */ + @SuppressWarnings("serial") + public static class EchoInputDoFn extends DoFn<KV<String, String>, KV<String, String>> { + + static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class); + + private SubProcessConfiguration configuration; + private String binaryName; + + public EchoInputDoFn(SubProcessConfiguration configuration, String binary) { + // Pass in configuration information the name of the filename of the sub-process and the level + // of concurrency + this.configuration = configuration; + this.binaryName = binary; + } + + @Setup + public void setUp() throws Exception { + CallingSubProcessUtils.setUp(configuration, binaryName); + } + + @ProcessElement + public void processElement(ProcessContext c) throws Exception { + try { + // Our Library takes a single command in position 0 which it will echo back in the result + SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs(); + Command command = new Command(0, String.valueOf(c.element().getValue())); + commands.putCommand(command); + + // The ProcessingKernel deals with the execution of the process + SubProcessKernel kernel = new SubProcessKernel(configuration, binaryName); + + // Run the command and work through the results + List<String> results = kernel.exec(commands); + for (String s : results) { + c.output(KV.of(c.element().getKey(), s)); + } + } catch (Exception ex) { + LOG.error("Error processing element ", ex); + throw ex; + } + } + } + + private static String getTestShellEcho(){ + return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;"; + } + + private static String getTestShellEchoAgain(){ + return "#!/bin/sh\n" + "filename=$1;\n" + + "echo \"You again? Well ok, here is your word again.\" >> $2 >> $filename;"; + } + +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java new file mode 100644 index 00000000000..85bd4496d08 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.subprocess; + + +import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration; +import org.apache.beam.sdk.options.Default; +import org.apache.beam.sdk.options.DefaultValueFactory; +import org.apache.beam.sdk.options.Description; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.Validation.Required; + +/** + * Options for running a sub process within a DoFn. + */ +public interface SubProcessPipelineOptions extends PipelineOptions { + + @Description("Source GCS directory where the C++ library is located gs://bucket/tests") + @Required + String getSourcePath(); + + void setSourcePath(String sourcePath); + + @Description("Working directory for the process I/O") + @Default.String("/tmp/grid_working_files") + String getWorkerPath(); + + void setWorkerPath(String workerPath); + + @Description("The maximum time to wait for the sub-process to complete") + @Default.Integer(3600) + Integer getWaitTime(); + + void setWaitTime(Integer waitTime); + + @Description("As sub-processes can be heavy weight define the level of concurrency level") + @Required + Integer getConcurrency(); + + void setConcurrency(Integer concurrency); + + @Description("Should log files only be uploaded if error.") + @Default.Boolean(true) + Boolean getOnlyUpLoadLogsOnError(); + + void setOnlyUpLoadLogsOnError(Boolean onlyUpLoadLogsOnError); + + @Default.InstanceFactory(SubProcessConfigurationFactory.class) + SubProcessConfiguration getSubProcessConfiguration(); + + void setSubProcessConfiguration(SubProcessConfiguration configuration); + + /** + * Confirm Configuration and return a configuration object used in pipeline. + */ + class SubProcessConfigurationFactory + implements DefaultValueFactory<SubProcessConfiguration> { + @Override + public SubProcessConfiguration create(PipelineOptions options) { + + SubProcessPipelineOptions subProcessPipelineOptions = (SubProcessPipelineOptions) options; + + SubProcessConfiguration configuration = new SubProcessConfiguration(); + + if (subProcessPipelineOptions.getSourcePath() == null) { + throw new IllegalStateException("Source path must be set"); + } + if (subProcessPipelineOptions.getConcurrency() == null + || subProcessPipelineOptions.getConcurrency() == 0) { + throw new IllegalStateException("Concurrency must be set and be > 0"); + } + configuration.setSourcePath(subProcessPipelineOptions.getSourcePath()); + configuration.setWorkerPath(subProcessPipelineOptions.getWorkerPath()); + configuration.setWaitTime(subProcessPipelineOptions.getWaitTime()); + configuration.setOnlyUpLoadLogsOnError(subProcessPipelineOptions.getOnlyUpLoadLogsOnError()); + configuration.concurrency = subProcessPipelineOptions.getConcurrency(); + + return configuration; + + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java new file mode 100644 index 00000000000..b55da9a3176 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.subprocess.configuration; + +import java.io.Serializable; + +/** + * Configuration file used to setup the Process kernel for execution of the external library Values + * are copied from the Options to all them to be Serializable. + */ +@SuppressWarnings("serial") +public class SubProcessConfiguration implements Serializable { + + // Source GCS directory where the C++ library is located gs://bucket/tests + public String sourcePath; + + // Working directory for the process I/O + public String workerPath; + + // The maximum time to wait for the sub-process to complete + public Integer waitTime; + + // "As sub-processes can be heavy weight match the concurrency level to num cores on the machines" + public Integer concurrency; + + // Should log files only be uploaded if error + public Boolean onlyUpLoadLogsOnError; + + public Boolean getOnlyUpLoadLogsOnError() { + return onlyUpLoadLogsOnError; + } + + public void setOnlyUpLoadLogsOnError(Boolean onlyUpLoadLogsOnError) { + this.onlyUpLoadLogsOnError = onlyUpLoadLogsOnError; + } + + public String getSourcePath() { + return sourcePath; + } + + public void setSourcePath(String sourcePath) { + this.sourcePath = sourcePath; + } + + public String getWorkerPath() { + return workerPath; + } + + public void setWorkerPath(String workerPath) { + this.workerPath = workerPath; + } + + public Integer getWaitTime() { + return waitTime; + } + + public void setWaitTime(Integer waitTime) { + this.waitTime = waitTime; + } + + public Integer getConcurrency() { + return concurrency; + } + + public void setConcurrency(Integer concurrency) { + this.concurrency = concurrency; + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessCommandLineArgs.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessCommandLineArgs.java new file mode 100644 index 00000000000..dd82ee91de9 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessCommandLineArgs.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.subprocess.kernel; + +import com.google.common.collect.Lists; +import java.util.List; + +/** + * Parameters to the sub-process, has tuple of ordinal position and the value. + */ +public class SubProcessCommandLineArgs { + + // Parameters to pass to the sub-process + private List<Command> parameters = Lists.newArrayList(); + + public void addCommand(Integer position, String value) { + parameters.add(new Command(position, value)); + } + + public void putCommand(Command command) { + parameters.add(command); + } + + public List<Command> getParameters() { + return parameters; + } + + /** + * Class used to store the SubProcces parameters. + */ + public static class Command { + + // The ordinal position of the command to pass to the sub-process + int ordinalPosition; + String value; + + @SuppressWarnings("unused") + private Command(){} + + public Command(int ordinalPosition, String value) { + this.ordinalPosition = ordinalPosition; + this.value = value; + } + + public int getKey() { + return ordinalPosition; + } + + public void setKey(int key) { + this.ordinalPosition = key; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessIOFiles.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessIOFiles.java new file mode 100644 index 00000000000..82f8e3a7696 --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessIOFiles.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.subprocess.kernel; + +import java.io.Closeable; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; +import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration; +import org.apache.beam.examples.subprocess.utils.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * All information generated from the process will be stored in output files. The local working + * directory is used to generate three files with extension .err for standard error output .out for + * standard out output .ret for storing the results from the called library. The files will have a + * uuid created for them based on java.util.UUID + */ +public class SubProcessIOFiles implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(SubProcessIOFiles.class); + + Path errFile; + Path outFile; + Path resultFile; + Path base; + + String errFileLocation = ""; + String outFileLocation = ""; + String uuid; + + public String getErrFileLocation() { + return errFileLocation; + } + + public String getOutFileLocation() { + return outFileLocation; + } + + /** + * @param workerWorkingDirectory + */ + public SubProcessIOFiles(String workerWorkingDirectory) { + + this.uuid = UUID.randomUUID().toString(); + base = Paths.get(workerWorkingDirectory); + + // Setup all the redirect handles, including the return file type + errFile = Paths.get(base.toString(), uuid + ".err"); + outFile = Paths.get(base.toString(), uuid + ".out"); + resultFile = Paths.get(base.toString(), uuid + ".res"); + + } + + public Path getErrFile() { + return errFile; + } + + public Path getOutFile() { + return outFile; + } + + public Path getResultFile() { + return resultFile; + } + + /** + * Clean up the files that have been created on the local worker file system. + * Without this expect both performance issues and eventual failure + */ + @Override + public void close() throws IOException { + + if (Files.exists(outFile)) { + Files.delete(outFile); + } + + if (Files.exists(errFile)) { + Files.delete(errFile); + } + + if (Files.exists(resultFile)) { + Files.delete(resultFile); + } + } + + /** + * Will copy the output files to the GCS path setup via the configuration. + * @param configuration + * @param params + */ + public void copyOutPutFilesToBucket(SubProcessConfiguration configuration, String params) { + if (Files.exists(outFile) || Files.exists(errFile)) { + try { + outFileLocation = FileUtils.copyFileFromWorkerToGCS(configuration, outFile); + } catch (Exception ex) { + LOG.error("Error uploading log file to storage ", ex); + } + + try { + errFileLocation = FileUtils.copyFileFromWorkerToGCS(configuration, errFile); + } catch (Exception ex) { + LOG.error("Error uploading log file to storage ", ex); + } + + LOG.info(String.format("Log Files for process: %s outFile was: %s errFile was: %s", params, + outFileLocation, errFileLocation)); + } else { + LOG.error(String.format("There was no output file or err file for process %s", params)); + } + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessKernel.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessKernel.java new file mode 100644 index 00000000000..e45bf5bf2ee --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessKernel.java @@ -0,0 +1,311 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.subprocess.kernel; + +import java.io.IOException; +import java.lang.ProcessBuilder.Redirect; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; +import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration; +import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils; +import org.apache.beam.examples.subprocess.utils.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * This is the process kernel which deals with exec of the subprocess. + * It also deals with all I/O. + */ +public class SubProcessKernel { + + private static final Logger LOG = LoggerFactory.getLogger(SubProcessKernel.class); + + private static final int MAX_SIZE_COMMAND_LINE_ARGS = 128 * 1024; + + SubProcessConfiguration configuration; + ProcessBuilder processBuilder; + + private SubProcessKernel() {} + + /** + * Creates the SubProcess Kernel ready for execution. + * Will deal with all input and outputs to the SubProcess + * @param options + * @param binaryName + */ + public SubProcessKernel(SubProcessConfiguration options, String binaryName) { + this.configuration = options; + this.processBuilder = new ProcessBuilder(binaryName); + } + + public List<String> exec(SubProcessCommandLineArgs commands) throws Exception { + try (CallingSubProcessUtils.Permit permit = + new CallingSubProcessUtils.Permit(processBuilder.command().get(0))) { + + List<String> results = null; + + try (SubProcessIOFiles outputFiles = new SubProcessIOFiles(configuration.getWorkerPath())) { + + try { + Process process = execBinary(processBuilder, commands, outputFiles); + results = collectProcessResults(process, processBuilder, outputFiles); + } catch (Exception ex) { + LOG.error("Error running executable ", ex); + throw (ex); + } + } catch (IOException ex) { + LOG.error( + "Unable to delete the outputfiles. This can lead to performance issues and failure", + ex); + } + return results; + } + } + + public byte[] execBinaryResult(SubProcessCommandLineArgs commands) throws Exception { + try (CallingSubProcessUtils.Permit permit = + new CallingSubProcessUtils.Permit(processBuilder.command().get(0))) { + + + try (SubProcessIOFiles outputFiles = new SubProcessIOFiles(configuration.getWorkerPath())) { + + try { + Process process = execBinary(processBuilder, commands, outputFiles); + return collectProcessResultsBytes(process, processBuilder, outputFiles); + } catch (Exception ex) { + LOG.error("Error running executable ", ex); + throw (ex); + } + } catch (IOException ex) { + LOG.error( + "Unable to delete the outputfiles. This can lead to performance issues and failure", + ex); + } + return new byte[0]; + } + } + + private ProcessBuilder prepareBuilder(ProcessBuilder builder, SubProcessCommandLineArgs commands, + SubProcessIOFiles outPutFiles) throws IllegalStateException { + + // Check we are not over the max size of command line parameters + if (getTotalCommandBytes(commands) > MAX_SIZE_COMMAND_LINE_ARGS) { + throw new IllegalStateException("Command is over 2MB in size"); + } + + appendExecutablePath(builder); + + // Add the result file path to the builder at position 1, 0 is reserved for the process itself + builder.command().add(1, outPutFiles.resultFile.toString()); + + // Shift commands by 2 ordinal positions and load into the builder + if (commands != null) { + for (SubProcessCommandLineArgs.Command s : commands.getParameters()) { + builder.command().add(s.ordinalPosition + 2, s.value); + } + } + + builder.redirectError(Redirect.appendTo(outPutFiles.errFile.toFile())); + builder.redirectOutput(Redirect.appendTo(outPutFiles.outFile.toFile())); + + return builder; + } + + /** + * Add up the total bytes used by the process. + * @param commands + * @return + */ + private int getTotalCommandBytes(SubProcessCommandLineArgs commands) { + int size = 0; + for (SubProcessCommandLineArgs.Command c : commands.getParameters()) { + size += c.value.length(); + } + return size; + } + + private Process execBinary(ProcessBuilder builder, SubProcessCommandLineArgs commands, + SubProcessIOFiles outPutFiles) throws Exception { + try { + + builder = prepareBuilder(builder, commands, outPutFiles); + Process process = builder.start(); + + boolean timeout = !process.waitFor(configuration.getWaitTime(), TimeUnit.SECONDS); + + if (timeout) { + String log = String.format( + "Timeout waiting to run process with parameters %s . " + + "Check to see if your timeout is long enough. Currently set at %s.", + createLogEntryFromInputs(builder.command()), configuration.getWaitTime()); + throw new Exception(log); + } + return process; + + } catch (Exception ex) { + + LOG.error(String.format("Error running process with parameters %s error was %s ", + createLogEntryFromInputs(builder.command()), ex.getMessage())); + throw new Exception(ex); + + } + } + + /** + * TODO clean up duplicate with byte[] version collectBinaryProcessResults. + * @param process + * @param builder + * @param outPutFiles + * @return List of results + * @throws Exception if process has non 0 value or no logs found then throw exception + */ + private List<String> collectProcessResults(Process process, ProcessBuilder builder, + SubProcessIOFiles outPutFiles) throws Exception { + + List<String> results = new ArrayList<>(); + + try { + + LOG.debug(String.format("Executing process %s", createLogEntryFromInputs(builder.command()))); + + // If process exit value is not 0 then subprocess failed, record logs + if (process.exitValue() != 0) { + outPutFiles.copyOutPutFilesToBucket(configuration, FileUtils.toStringParams(builder)); + String log = createLogEntryForProcessFailure(process, builder.command(), outPutFiles); + throw new Exception(log); + } + + // If no return file then either something went wrong or the binary is setup incorrectly for + // the ret file either way throw error + if (!Files.exists(outPutFiles.resultFile)) { + String log = createLogEntryForProcessFailure(process, builder.command(), outPutFiles); + outPutFiles.copyOutPutFilesToBucket(configuration, FileUtils.toStringParams(builder)); + throw new Exception(log); + } + + // Everything looks healthy return values + try (Stream<String> lines = Files.lines(outPutFiles.resultFile)) { + for (String line : (Iterable<String>) lines::iterator) { + results.add(line); + } + } + return results; + } catch (Exception ex) { + String log = String.format("Unexpected error runnng process. %s error message was %s", + createLogEntryFromInputs(builder.command()), ex.getMessage()); + throw new Exception(log); + } + } + + /** + * Used when the reault file contains binary data. + * @param process + * @param builder + * @param outPutFiles + * @return Binary results + * @throws Exception if process has non 0 value or no logs found then throw exception + */ + private byte[] collectProcessResultsBytes(Process process, ProcessBuilder builder, + SubProcessIOFiles outPutFiles) throws Exception { + + Byte[] results; + + try { + + LOG.debug(String.format("Executing process %s", createLogEntryFromInputs(builder.command()))); + + // If process exit value is not 0 then subprocess failed, record logs + if (process.exitValue() != 0) { + outPutFiles.copyOutPutFilesToBucket(configuration, FileUtils.toStringParams(builder)); + String log = createLogEntryForProcessFailure(process, builder.command(), outPutFiles); + throw new Exception(log); + } + + // If no return file then either something went wrong or the binary is setup incorrectly for + // the ret file either way throw error + if (!Files.exists(outPutFiles.resultFile)) { + String log = createLogEntryForProcessFailure(process, builder.command(), outPutFiles); + outPutFiles.copyOutPutFilesToBucket(configuration, FileUtils.toStringParams(builder)); + throw new Exception(log); + } + + // Everything looks healthy return bytes + return Files.readAllBytes(outPutFiles.resultFile); + + } catch (Exception ex) { + String log = String.format("Unexpected error runnng process. %s error message was %s", + createLogEntryFromInputs(builder.command()), ex.getMessage()); + throw new Exception(log); + } + } + + private static String createLogEntryForProcessFailure(Process process, List<String> commands, + SubProcessIOFiles files) { + + StringBuilder stringBuilder = new StringBuilder(); + + // Highlight when no result file is found vs standard process error + if (process.exitValue() == 0) { + stringBuilder.append( + String.format("%nProcess succeded but no result file was found %n")); + } else { + stringBuilder.append( + String.format("%nProcess error failed with exit value of %s %n", process.exitValue())); + } + + stringBuilder + .append(String.format("Command info was %s %n", createLogEntryFromInputs(commands))); + + stringBuilder.append(String.format("First line of error file is %s %n", + FileUtils.readLineOfLogFile(files.errFile))); + + stringBuilder.append(String.format("First line of out file is %s %n", + FileUtils.readLineOfLogFile(files.outFile))); + + stringBuilder.append(String.format("First line of ret file is %s %n", + FileUtils.readLineOfLogFile(files.resultFile))); + + return stringBuilder.toString(); + + } + + private static String createLogEntryFromInputs(List<String> commands) { + String params; + if (commands != null) { + params = String.join(",", commands); + } else { + params = "No-Commands"; + } + return params; + } + + // Pass the Path of the binary to the SubProcess in Command position 0 + private ProcessBuilder appendExecutablePath(ProcessBuilder builder) { + String executable = builder.command().get(0); + if (executable == null) { + throw new IllegalArgumentException( + "No executable provided to the Process Builder... we will do... nothing... "); + } + builder.command().set(0, + FileUtils.getFileResourceId(configuration.getWorkerPath(), executable).toString()); + return builder; + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/CallingSubProcessUtils.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/CallingSubProcessUtils.java new file mode 100644 index 00000000000..49e3ab7839b --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/CallingSubProcessUtils.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.subprocess.utils; + +import com.google.common.collect.Sets; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Semaphore; +import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utility class for dealing with concurrency and binary file copies to the worker. + */ +public class CallingSubProcessUtils { + + // Prevent Instantiation + private CallingSubProcessUtils() {} + + static final Logger LOG = LoggerFactory.getLogger(CallingSubProcessUtils.class); + + static boolean initCompleted = false; + + // Allow multiple subclasses to create files, but only one thread per subclass can add the file to + // the worker + private static final Set<String> downloadedFiles = Sets.<String>newConcurrentHashSet(); + + // Limit the number of threads able to do work + private static Map<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>(); + + public static void setUp(SubProcessConfiguration configuration, String binaryName) + throws Exception { + + if (!semaphores.containsKey(binaryName)) { + initSemaphore(configuration.getConcurrency(), binaryName); + } + + synchronized (downloadedFiles) { + if (!downloadedFiles.contains(binaryName)) { + // Create Directories if needed + FileUtils.createDirectoriesOnWorker(configuration); + LOG.info("Calling filesetup to move Executables to worker."); + ExecutableFile executableFile = new ExecutableFile(configuration, binaryName); + FileUtils.copyFileFromGCSToWorker(executableFile); + downloadedFiles.add(binaryName); + } + } + } + + public static synchronized void initSemaphore(Integer permits, String binaryName) { + if (!semaphores.containsKey(binaryName)) { + LOG.info(String.format(String.format("Initialized Semaphore for binary %s ", binaryName))); + semaphores.put(binaryName, new Semaphore(permits)); + } + } + + private static void aquireSemaphore(String binaryName) throws IllegalStateException { + if (!semaphores.containsKey(binaryName)) { + throw new IllegalStateException("Semaphore is NULL, check init logic in @Setup."); + } + try { + semaphores.get(binaryName).acquire(); + } catch (InterruptedException ex) { + LOG.error("Interupted during aquire", ex); + } + } + + private static void releaseSemaphore(String binaryName) throws IllegalStateException { + if (!semaphores.containsKey(binaryName)) { + throw new IllegalStateException("Semaphore is NULL, check init logic in @Setup."); + } + semaphores.get(binaryName).release(); + } + + /** + * Permit class for access to worker cpu resources. + */ + public static class Permit implements AutoCloseable { + + private String binaryName; + + public Permit(String binaryName){ + this.binaryName = binaryName; + CallingSubProcessUtils.aquireSemaphore(binaryName); + } + + @Override + public void close() { + CallingSubProcessUtils.releaseSemaphore(binaryName); + } + + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/ExecutableFile.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/ExecutableFile.java new file mode 100644 index 00000000000..a5a789250cb --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/ExecutableFile.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.subprocess.utils; + +import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration; +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Contains the configuration for the external library. + */ +@DefaultCoder(AvroCoder.class) +public class ExecutableFile { + + String fileName; + + private String sourceGCSLocation; + private String destinationLocation; + + static final Logger LOG = LoggerFactory.getLogger(ExecutableFile.class); + + public String getSourceGCSLocation() { + return sourceGCSLocation; + } + + public void setSourceGCSLocation(String sourceGCSLocation) { + this.sourceGCSLocation = sourceGCSLocation; + } + + public String getDestinationLocation() { + return destinationLocation; + } + + public void setDestinationLocation(String destinationLocation) { + this.destinationLocation = destinationLocation; + } + + public ExecutableFile(SubProcessConfiguration configuration, String fileName) + throws IllegalStateException { + if (configuration == null) { + throw new IllegalStateException("Configuration can not be NULL"); + } + if (fileName == null) { + throw new IllegalStateException("FileName can not be NULLt"); + } + this.fileName = fileName; + setDestinationLocation(configuration); + setSourceLocation(configuration); + } + + private void setDestinationLocation(SubProcessConfiguration configuration) { + this.sourceGCSLocation = + FileUtils.getFileResourceId(configuration.getSourcePath(), fileName).toString(); + } + + private void setSourceLocation(SubProcessConfiguration configuration) { + this.destinationLocation = + FileUtils.getFileResourceId(configuration.getWorkerPath(), fileName).toString(); + } +} diff --git a/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/FileUtils.java b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/FileUtils.java new file mode 100644 index 00000000000..07e118b938d --- /dev/null +++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/FileUtils.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.subprocess.utils; + +import java.io.BufferedReader; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.WritableByteChannel; +import java.nio.file.FileAlreadyExistsException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Utilities for dealing with movement of files from object stores and workers. + */ +public class FileUtils { + + private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class); + + public static ResourceId getFileResourceId(String directory, String fileName) { + ResourceId resourceID = FileSystems.matchNewResource(directory, true); + return resourceID.getCurrentDirectory().resolve(fileName, StandardResolveOptions.RESOLVE_FILE); + } + + public static String toStringParams(ProcessBuilder builder) { + return (String.join(",", builder.command())); + } + + public static String copyFileFromWorkerToGCS(SubProcessConfiguration configuration, + Path fileToUpload) throws Exception { + + Path fileName; + + if ((fileName = fileToUpload.getFileName()) == null) { + throw new IllegalArgumentException("FileName can not be null."); + } + + ResourceId sourceFile = getFileResourceId(configuration.getWorkerPath(), fileName.toString()); + + LOG.info("Copying file from worker " + sourceFile); + + ResourceId destinationFile = + getFileResourceId(configuration.getSourcePath(), fileName.toString()); + // TODO currently not supported with different schemas for example GCS to local, else could use + // FileSystems.copy(ImmutableList.of(sourceFile), ImmutableList.of(destinationFile)); + try { + return copyFile(sourceFile, destinationFile); + } catch (Exception ex) { + LOG.error(String.format("Error copying file from %s to %s", sourceFile, destinationFile), + ex); + throw ex; + } + } + + public static String copyFileFromGCSToWorker(ExecutableFile execuableFile) throws Exception { + + ResourceId sourceFile = + FileSystems.matchNewResource(execuableFile.getSourceGCSLocation(), false); + ResourceId destinationFile = + FileSystems.matchNewResource(execuableFile.getDestinationLocation(), false); + try { + LOG.info(String.format("Moving File %s to %s ", execuableFile.getSourceGCSLocation(), + execuableFile.getDestinationLocation())); + Path path = Paths.get(execuableFile.getDestinationLocation()); + + if (path.toFile().exists()) { + LOG.warn(String.format( + "Overwriting file %s, should only see this once per worker.", + execuableFile.getDestinationLocation())); + } + copyFile(sourceFile, destinationFile); + path.toFile().setExecutable(true); + return path.toString(); + + } catch (Exception ex) { + LOG.error(String.format("Error moving file : %s ", execuableFile.fileName), ex); + throw ex; + } + } + + public static String copyFile(ResourceId sourceFile, ResourceId destinationFile) + throws IOException { + + try (WritableByteChannel writeChannel = FileSystems.create(destinationFile, "text/plain")) { + try (ReadableByteChannel readChannel = FileSystems.open(sourceFile)) { + + final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024); + while (readChannel.read(buffer) != -1) { + buffer.flip(); + writeChannel.write(buffer); + buffer.compact(); + } + buffer.flip(); + while (buffer.hasRemaining()) { + writeChannel.write(buffer); + } + } + } + + return destinationFile.toString(); + } + + /** + * Create directories needed based on configuration. + * @param configuration + * @throws IOException + */ + public static void createDirectoriesOnWorker(SubProcessConfiguration configuration) + throws IOException { + + try { + + Path path = Paths.get(configuration.getWorkerPath()); + + if (!path.toFile().exists()) { + Files.createDirectories(path); + LOG.info(String.format("Created Folder %s ", path.toFile())); + } + } catch (FileAlreadyExistsException ex) { + LOG.warn(String.format( + " Tried to create folder %s which already existsed, this should not happen!", + configuration.getWorkerPath()), ex); + } + } + + public static String readLineOfLogFile(Path path) { + + try (BufferedReader br = new BufferedReader(new FileReader(path.toString()))) { + return br.readLine(); + } catch (FileNotFoundException e) { + LOG.error("Error reading the first line of file", e); + } catch (IOException e) { + LOG.error("Error reading the first line of file", e); + } + + // `return empty string rather than NULL string as this data is often used in further logging + return ""; + } +} diff --git a/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java new file mode 100644 index 00000000000..4a9bec7380a --- /dev/null +++ b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.examples.subprocess; + +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration; +import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs; +import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs.Command; +import org.apache.beam.examples.subprocess.kernel.SubProcessKernel; +import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils; +import org.apache.beam.sdk.extensions.gcp.options.GcsOptions; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.GcsUtil; +import org.apache.beam.sdk.util.gcsfs.GcsPath; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * To keep {@link org.apache.beam.examples.subprocess.ExampleEchoPipeline} simple, + * it is not factored or testable. This test file should be maintained with a copy of its + * code for a basic smoke test. + **/ +public class ExampleEchoPipelineTest { + + static final Logger LOG = LoggerFactory.getLogger(ExampleEchoPipelineTest.class); + + @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); + + @Test public void testExampleEchoPipeline() throws Exception { + + // Create two Bash files as tests for the binary files + + Path fileA = Files.createTempFile("test-Echo", ".sh"); + Path fileB = Files.createTempFile("test-EchoAgain", ".sh"); + + Path workerTempFiles = Files.createTempDirectory("test-Echoo"); + + try (SeekableByteChannel channel = + FileChannel.open(fileA, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + channel.write(ByteBuffer.wrap(getTestShellEcho().getBytes())); + } + + try (SeekableByteChannel channel = + FileChannel.open(fileB, StandardOpenOption.CREATE, StandardOpenOption.WRITE)) { + channel.write(ByteBuffer.wrap(getTestShellEchoAgain().getBytes())); + } + + + // Read in the options for the pipeline + SubProcessPipelineOptions options = PipelineOptionsFactory + .as(SubProcessPipelineOptions.class); + + options.setConcurrency(2); + options.setSourcePath(fileA.getParent().toString()); + options.setWorkerPath(workerTempFiles.toAbsolutePath().toString()); + + p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil()); + + // Setup the Configuration option used with all transforms + SubProcessConfiguration configuration = options.getSubProcessConfiguration(); + + // Create some sample data to be fed to our c++ Echo library + List<KV<String, String>> sampleData = new ArrayList<>(); + + for (int i = 0; i < 100; i++) { + String str = String.valueOf(i); + sampleData.add(KV.of(str, str)); + } + + // Define the pipeline which is two transforms echoing the inputs out to Logs + // For this use case we will make use of two shell files instead of the binary to make + // testing easier + PCollection<KV<String, String>> output = p.apply(Create.of(sampleData)) + .apply("Echo inputs round 1", + ParDo.of(new EchoInputDoFn(configuration, fileA.getFileName().toString()))) + .apply("Echo inputs round 2", + ParDo.of(new EchoInputDoFn(configuration, fileB.getFileName().toString()))); + + PAssert.that(output).containsInAnyOrder(sampleData); + + p.run(); + + + } + + /** + * Simple DoFn that echos the element, used as an example of running a C++ library. + */ + @SuppressWarnings("serial") @RunWith(JUnit4.class) public static class EchoInputDoFn + extends DoFn<KV<String, String>, KV<String, String>> { + + static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class); + + private SubProcessConfiguration configuration; + private String binaryName; + + public EchoInputDoFn(SubProcessConfiguration configuration, String binary) { + // Pass in configuration information the name of the filename of the sub-process and the level + // of concurrency + this.configuration = configuration; + this.binaryName = binary; + } + + @Setup public void setUp() throws Exception { + CallingSubProcessUtils.setUp(configuration, binaryName); + } + + @ProcessElement public void processElement(ProcessContext c) throws Exception { + try { + // Our Library takes a single command in position 0 which it will echo back in the result + SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs(); + Command command = new Command(0, String.valueOf(c.element().getValue())); + commands.putCommand(command); + + // The ProcessingKernel deals with the execution of the process + SubProcessKernel kernel = new SubProcessKernel(configuration, binaryName); + + // Run the command and work through the results + List<String> results = kernel.exec(commands); + for (String s : results) { + c.output(KV.of(c.element().getKey(), s)); + } + } catch (Exception ex) { + LOG.error("Error processing element ", ex); + throw ex; + } + } + } + + private static String getTestShellEcho(){ + return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;"; + } + + private static String getTestShellEchoAgain(){ + return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;"; + } + + private GcsUtil buildMockGcsUtil() throws IOException { + GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class); + + // Any request to open gets a new bogus channel + Mockito.when(mockGcsUtil.open(Mockito.any(GcsPath.class))) + .then(new Answer<SeekableByteChannel>() { + + @Override public SeekableByteChannel answer(InvocationOnMock invocation) + throws Throwable { + return FileChannel.open(Files.createTempFile("channel-", ".tmp"), + StandardOpenOption.CREATE, + StandardOpenOption.DELETE_ON_CLOSE); + } + }); + + // Any request for expansion returns a list containing the original GcsPath + // This is required to pass validation that occurs in TextIO during apply() + Mockito.when(mockGcsUtil.expand(Mockito.any(GcsPath.class))).then(new Answer<List<GcsPath>>() { + + @Override public List<GcsPath> answer(InvocationOnMock invocation) throws Throwable { + return ImmutableList.of((GcsPath) invocation.getArguments()[0]); + } + }); + + return mockGcsUtil; + } + + } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 90002) Time Spent: 40m (was: 0.5h) > Add examples of running external libraries on workers > ----------------------------------------------------- > > Key: BEAM-2871 > URL: https://issues.apache.org/jira/browse/BEAM-2871 > Project: Beam > Issue Type: New Feature > Components: examples-java > Affects Versions: 2.1.0 > Reporter: Reza ardeshir rokni > Assignee: Reuven Lax > Priority: Major > Time Spent: 40m > Remaining Estimate: 0h > > Provide example for how to run external libraries such as c++ code within a > DoFn. > More details of this pattern can be viewed in the blog: > https://cloud.google.com/blog/big-data/2017/07/running-external-libraries-with-cloud-dataflow-for-grid-computing-workloads -- This message was sent by Atlassian JIRA (v7.6.3#76005)