[ 
https://issues.apache.org/jira/browse/BEAM-2871?focusedWorklogId=90002&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-90002
 ]

ASF GitHub Bot logged work on BEAM-2871:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 11/Apr/18 15:36
            Start Date: 11/Apr/18 15:36
    Worklog Time Spent: 10m 
      Work Description: reuvenlax closed pull request #3961: [BEAM-2871] Add 
advanced examples of running external libraries on workers
URL: https://github.com/apache/beam/pull/3961
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/Echo.cc 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/Echo.cc
new file mode 100644
index 00000000000..4ccb4dfef93
--- /dev/null
+++ b/examples/java/src/main/java/org/apache/beam/examples/subprocess/Echo.cc
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// 'Hello World!' program
+
+#include <iostream>
+#include <fstream>
+
+int main(int argc, char* argv[])
+{
+    if(argc < 3){
+        std::cerr << "No parameter sent, must send the return file location 
and a statement to echo" << '\n';
+        return 1;
+    }
+    std::string retFile = argv[1];
+    std::string word = argv[2];
+    std::ofstream myfile;
+    myfile.open (retFile);
+    myfile << word;
+    myfile.close();
+    return 0;
+}
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/EchoAgain.cc 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/EchoAgain.cc
new file mode 100644
index 00000000000..419bf405da3
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/EchoAgain.cc
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+// 'Hello World!' program, just echos what was sent to it.
+
+#include <iostream>
+#include <fstream>
+
+int main(int argc, char* argv[])
+{
+    if(argc < 3){
+        std::cerr << "No parameter sent, must send the return file location 
and a statement to echo" << '\n';
+        return 1;
+    }
+    std::string retFile = argv[1];
+    std::string word = argv[2];
+    std::ofstream myfile;
+    myfile.open (retFile);
+    myfile << "You again? Well ok, here is your word again." << word ;
+    myfile.close();
+    return 0;
+}
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
new file mode 100644
index 00000000000..7afda9ef3a8
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/ExampleEchoPipeline.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess;
+
+import java.util.ArrayList;
+import java.util.List;
+import 
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs;
+import 
org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs.Command;
+import org.apache.beam.examples.subprocess.kernel.SubProcessKernel;
+import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * In this example batch pipeline we will invoke a simple Echo C++ library 
within a DoFn The sample
+ * makes use of a ExternalLibraryDoFn class which abstracts the setup and 
processing of the
+ * executable, logs and results.
+ * For this example we are using commands passed to the library based
+ * on ordinal position but for a production system you should use a mechanism 
like ProtoBuffers with
+ * Base64 encoding to pass the parameters to the library
+ * To test this example you will need to build the files Echo.cc and 
EchoAgain.cc in a
+ * linux env matching the runner that you are using (using g++ with static 
option).
+ * Once built copy them to the SourcePath defined in
+ * {@link SubProcessPipelineOptions}
+ *
+ */
+public class ExampleEchoPipeline {
+  static final Logger LOG = LoggerFactory.getLogger(ExampleEchoPipeline.class);
+
+  public static void main(String[] args) throws Exception {
+
+    // Read in the options for the pipeline
+    SubProcessPipelineOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
+        .as(SubProcessPipelineOptions.class);
+
+    Pipeline p = Pipeline.create(options);
+
+    // Setup the Configuration option used with all transforms
+    SubProcessConfiguration configuration = 
options.getSubProcessConfiguration();
+
+    // Create some sample data to be fed to our c++ Echo library
+    List<KV<String, String>> sampleData = new ArrayList<>();
+    for (int i = 0; i < 10000; i++) {
+      String str = String.valueOf(i);
+      sampleData.add(KV.of(str, str));
+    }
+
+    // Define the pipeline which is two transforms echoing the inputs out to 
Logs
+    p.apply(Create.of(sampleData))
+        .apply("Echo inputs round 1", ParDo.of(
+            new EchoInputDoFn(configuration, "Echo")))
+        .apply("Echo inputs round 2", ParDo.of(
+            new EchoInputDoFn(configuration, "EchoAgain")));
+
+    p.run();
+  }
+
+  /**
+   * Simple DoFn that echos the element, used as an example of running a C++ 
library.
+   */
+  @SuppressWarnings("serial")
+  public static class EchoInputDoFn extends DoFn<KV<String, String>, 
KV<String, String>> {
+
+    static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class);
+
+    private SubProcessConfiguration configuration;
+    private String binaryName;
+
+    public EchoInputDoFn(SubProcessConfiguration configuration, String binary) 
{
+      // Pass in configuration information the name of the filename of the 
sub-process and the level
+      // of concurrency
+      this.configuration = configuration;
+      this.binaryName = binary;
+    }
+
+    @Setup
+    public void setUp() throws Exception {
+      CallingSubProcessUtils.setUp(configuration, binaryName);
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      try {
+        // Our Library takes a single command in position 0 which it will echo 
back in the result
+        SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
+        Command command = new Command(0, 
String.valueOf(c.element().getValue()));
+        commands.putCommand(command);
+
+        // The ProcessingKernel deals with the execution of the process
+        SubProcessKernel kernel = new SubProcessKernel(configuration, 
binaryName);
+
+        // Run the command and work through the results
+        List<String> results = kernel.exec(commands);
+        for (String s : results) {
+          c.output(KV.of(c.element().getKey(), s));
+        }
+      } catch (Exception ex) {
+        LOG.error("Error processing element ", ex);
+        throw ex;
+      }
+    }
+  }
+
+  private static String getTestShellEcho(){
+    return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;";
+  }
+
+  private static String getTestShellEchoAgain(){
+    return "#!/bin/sh\n" + "filename=$1;\n"
+        + "echo \"You again? Well ok, here is your word again.\" >> $2 >> 
$filename;";
+  }
+
+}
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java
new file mode 100644
index 00000000000..85bd4496d08
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/SubProcessPipelineOptions.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess;
+
+
+import 
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.Validation.Required;
+
+/**
+ * Options for running a sub process within a DoFn.
+ */
+public interface SubProcessPipelineOptions extends PipelineOptions {
+
+  @Description("Source GCS directory where the C++ library is located 
gs://bucket/tests")
+  @Required
+  String getSourcePath();
+
+  void setSourcePath(String sourcePath);
+
+  @Description("Working directory for the process I/O")
+  @Default.String("/tmp/grid_working_files")
+  String getWorkerPath();
+
+  void setWorkerPath(String workerPath);
+
+  @Description("The maximum time to wait for the sub-process to complete")
+  @Default.Integer(3600)
+  Integer getWaitTime();
+
+  void setWaitTime(Integer waitTime);
+
+  @Description("As sub-processes can be heavy weight define the level of 
concurrency level")
+  @Required
+  Integer getConcurrency();
+
+  void setConcurrency(Integer concurrency);
+
+  @Description("Should log files only be uploaded if error.")
+  @Default.Boolean(true)
+  Boolean getOnlyUpLoadLogsOnError();
+
+  void setOnlyUpLoadLogsOnError(Boolean onlyUpLoadLogsOnError);
+
+  @Default.InstanceFactory(SubProcessConfigurationFactory.class)
+  SubProcessConfiguration getSubProcessConfiguration();
+
+  void setSubProcessConfiguration(SubProcessConfiguration configuration);
+
+  /**
+   * Confirm Configuration and return a configuration object used in pipeline.
+   */
+  class SubProcessConfigurationFactory
+      implements DefaultValueFactory<SubProcessConfiguration> {
+    @Override
+    public SubProcessConfiguration create(PipelineOptions options) {
+
+      SubProcessPipelineOptions subProcessPipelineOptions = 
(SubProcessPipelineOptions) options;
+
+      SubProcessConfiguration configuration = new SubProcessConfiguration();
+
+      if (subProcessPipelineOptions.getSourcePath() == null) {
+        throw new IllegalStateException("Source path must be set");
+      }
+      if (subProcessPipelineOptions.getConcurrency() == null
+          || subProcessPipelineOptions.getConcurrency() == 0) {
+        throw new IllegalStateException("Concurrency must be set and be > 0");
+      }
+      configuration.setSourcePath(subProcessPipelineOptions.getSourcePath());
+      configuration.setWorkerPath(subProcessPipelineOptions.getWorkerPath());
+      configuration.setWaitTime(subProcessPipelineOptions.getWaitTime());
+      
configuration.setOnlyUpLoadLogsOnError(subProcessPipelineOptions.getOnlyUpLoadLogsOnError());
+      configuration.concurrency = subProcessPipelineOptions.getConcurrency();
+
+      return configuration;
+
+    }
+  }
+}
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java
new file mode 100644
index 00000000000..b55da9a3176
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/configuration/SubProcessConfiguration.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.configuration;
+
+import java.io.Serializable;
+
+/**
+ * Configuration file used to setup the Process kernel for execution of the 
external library Values
+ * are copied from the Options to all them to be Serializable.
+ */
+@SuppressWarnings("serial")
+public class SubProcessConfiguration implements Serializable {
+
+  // Source GCS directory where the C++ library is located gs://bucket/tests
+  public String sourcePath;
+
+  // Working directory for the process I/O
+  public String workerPath;
+
+  // The maximum time to wait for the sub-process to complete
+  public Integer waitTime;
+
+  // "As sub-processes can be heavy weight match the concurrency level to num 
cores on the machines"
+  public Integer concurrency;
+
+  // Should log files only be uploaded if error
+  public Boolean onlyUpLoadLogsOnError;
+
+  public Boolean getOnlyUpLoadLogsOnError() {
+    return onlyUpLoadLogsOnError;
+  }
+
+  public void setOnlyUpLoadLogsOnError(Boolean onlyUpLoadLogsOnError) {
+    this.onlyUpLoadLogsOnError = onlyUpLoadLogsOnError;
+  }
+
+  public String getSourcePath() {
+    return sourcePath;
+  }
+
+  public void setSourcePath(String sourcePath) {
+    this.sourcePath = sourcePath;
+  }
+
+  public String getWorkerPath() {
+    return workerPath;
+  }
+
+  public void setWorkerPath(String workerPath) {
+    this.workerPath = workerPath;
+  }
+
+  public Integer getWaitTime() {
+    return waitTime;
+  }
+
+  public void setWaitTime(Integer waitTime) {
+    this.waitTime = waitTime;
+  }
+
+  public Integer getConcurrency() {
+    return concurrency;
+  }
+
+  public void setConcurrency(Integer concurrency) {
+    this.concurrency = concurrency;
+  }
+}
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessCommandLineArgs.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessCommandLineArgs.java
new file mode 100644
index 00000000000..dd82ee91de9
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessCommandLineArgs.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.kernel;
+
+import com.google.common.collect.Lists;
+import java.util.List;
+
+/**
+ * Parameters to the sub-process, has tuple of ordinal position and the value.
+ */
+public class SubProcessCommandLineArgs {
+
+  // Parameters to pass to the sub-process
+  private List<Command> parameters = Lists.newArrayList();
+
+  public void addCommand(Integer position, String value) {
+    parameters.add(new Command(position, value));
+  }
+
+  public void putCommand(Command command) {
+    parameters.add(command);
+  }
+
+  public List<Command> getParameters() {
+    return parameters;
+  }
+
+  /**
+   * Class used to store the SubProcces parameters.
+   */
+  public static class Command {
+
+    // The ordinal position of the command to pass to the sub-process
+    int ordinalPosition;
+    String value;
+
+    @SuppressWarnings("unused")
+    private Command(){}
+
+    public Command(int ordinalPosition, String value) {
+      this.ordinalPosition = ordinalPosition;
+      this.value = value;
+    }
+
+    public int getKey() {
+      return ordinalPosition;
+    }
+
+    public void setKey(int key) {
+      this.ordinalPosition = key;
+    }
+
+    public String getValue() {
+      return value;
+    }
+
+    public void setValue(String value) {
+      this.value = value;
+    }
+  }
+}
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessIOFiles.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessIOFiles.java
new file mode 100644
index 00000000000..82f8e3a7696
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessIOFiles.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.kernel;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.UUID;
+import 
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.examples.subprocess.utils.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * All information generated from the process will be stored in output files. 
The local working
+ * directory is used to generate three files with extension .err for standard 
error output .out for
+ * standard out output .ret for storing the results from the called library. 
The files will have a
+ * uuid created for them based on java.util.UUID
+ */
+public class SubProcessIOFiles implements Closeable {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SubProcessIOFiles.class);
+
+  Path errFile;
+  Path outFile;
+  Path resultFile;
+  Path base;
+
+  String errFileLocation = "";
+  String outFileLocation = "";
+  String uuid;
+
+  public String getErrFileLocation() {
+    return errFileLocation;
+  }
+
+  public String getOutFileLocation() {
+    return outFileLocation;
+  }
+
+  /**
+   * @param workerWorkingDirectory
+   */
+  public SubProcessIOFiles(String workerWorkingDirectory) {
+
+    this.uuid = UUID.randomUUID().toString();
+    base = Paths.get(workerWorkingDirectory);
+
+    // Setup all the redirect handles, including the return file type
+    errFile = Paths.get(base.toString(), uuid + ".err");
+    outFile = Paths.get(base.toString(), uuid + ".out");
+    resultFile = Paths.get(base.toString(), uuid + ".res");
+
+  }
+
+  public Path getErrFile() {
+    return errFile;
+  }
+
+  public Path getOutFile() {
+    return outFile;
+  }
+
+  public Path getResultFile() {
+    return resultFile;
+  }
+
+  /**
+   * Clean up the files that have been created on the local worker file system.
+   * Without this expect both performance issues and eventual failure
+   */
+  @Override
+  public void close() throws IOException {
+
+    if (Files.exists(outFile)) {
+      Files.delete(outFile);
+    }
+
+    if (Files.exists(errFile)) {
+      Files.delete(errFile);
+    }
+
+    if (Files.exists(resultFile)) {
+      Files.delete(resultFile);
+    }
+  }
+
+  /**
+   * Will copy the output files to the GCS path setup via the configuration.
+   * @param configuration
+   * @param params
+   */
+  public void copyOutPutFilesToBucket(SubProcessConfiguration configuration, 
String params) {
+    if (Files.exists(outFile) || Files.exists(errFile)) {
+      try {
+        outFileLocation = FileUtils.copyFileFromWorkerToGCS(configuration, 
outFile);
+      } catch (Exception ex) {
+        LOG.error("Error uploading log file to storage ", ex);
+      }
+
+      try {
+        errFileLocation = FileUtils.copyFileFromWorkerToGCS(configuration, 
errFile);
+      } catch (Exception ex) {
+        LOG.error("Error uploading log file to storage ", ex);
+      }
+
+      LOG.info(String.format("Log Files for process: %s outFile was: %s 
errFile was: %s", params,
+          outFileLocation, errFileLocation));
+    } else {
+      LOG.error(String.format("There was no output file or err file for 
process %s", params));
+    }
+  }
+}
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessKernel.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessKernel.java
new file mode 100644
index 00000000000..e45bf5bf2ee
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/kernel/SubProcessKernel.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.kernel;
+
+import java.io.IOException;
+import java.lang.ProcessBuilder.Redirect;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
+import 
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
+import org.apache.beam.examples.subprocess.utils.FileUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This is the process kernel which deals with exec of the subprocess.
+ * It also deals with all I/O.
+ */
+public class SubProcessKernel {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SubProcessKernel.class);
+
+  private static final int MAX_SIZE_COMMAND_LINE_ARGS = 128 * 1024;
+
+  SubProcessConfiguration configuration;
+  ProcessBuilder processBuilder;
+
+  private SubProcessKernel() {}
+
+  /**
+   * Creates the SubProcess Kernel ready for execution.
+   * Will deal with all input and outputs to the SubProcess
+   * @param options
+   * @param binaryName
+   */
+  public SubProcessKernel(SubProcessConfiguration options, String binaryName) {
+    this.configuration = options;
+    this.processBuilder = new ProcessBuilder(binaryName);
+  }
+
+  public List<String> exec(SubProcessCommandLineArgs commands) throws 
Exception {
+    try (CallingSubProcessUtils.Permit permit =
+        new CallingSubProcessUtils.Permit(processBuilder.command().get(0))) {
+
+      List<String> results = null;
+
+      try (SubProcessIOFiles outputFiles = new 
SubProcessIOFiles(configuration.getWorkerPath())) {
+
+        try {
+          Process process = execBinary(processBuilder, commands, outputFiles);
+          results = collectProcessResults(process, processBuilder, 
outputFiles);
+        } catch (Exception ex) {
+          LOG.error("Error running executable ", ex);
+          throw (ex);
+        }
+      } catch (IOException ex) {
+        LOG.error(
+            "Unable to delete the outputfiles. This can lead to performance 
issues and failure",
+            ex);
+      }
+      return results;
+    }
+  }
+
+  public byte[] execBinaryResult(SubProcessCommandLineArgs commands) throws 
Exception {
+    try (CallingSubProcessUtils.Permit permit =
+        new CallingSubProcessUtils.Permit(processBuilder.command().get(0))) {
+
+
+      try (SubProcessIOFiles outputFiles = new 
SubProcessIOFiles(configuration.getWorkerPath())) {
+
+        try {
+          Process process = execBinary(processBuilder, commands, outputFiles);
+          return collectProcessResultsBytes(process, processBuilder, 
outputFiles);
+        } catch (Exception ex) {
+          LOG.error("Error running executable ", ex);
+          throw (ex);
+        }
+      } catch (IOException ex) {
+        LOG.error(
+            "Unable to delete the outputfiles. This can lead to performance 
issues and failure",
+            ex);
+      }
+      return new byte[0];
+    }
+  }
+
+  private ProcessBuilder prepareBuilder(ProcessBuilder builder, 
SubProcessCommandLineArgs commands,
+      SubProcessIOFiles outPutFiles) throws IllegalStateException {
+
+    // Check we are not over the max size of command line parameters
+    if (getTotalCommandBytes(commands) > MAX_SIZE_COMMAND_LINE_ARGS) {
+      throw new IllegalStateException("Command is over 2MB in size");
+    }
+
+    appendExecutablePath(builder);
+
+    // Add the result file path to the builder at position 1, 0 is reserved 
for the process itself
+    builder.command().add(1, outPutFiles.resultFile.toString());
+
+    // Shift commands by 2 ordinal positions and load into the builder
+    if (commands != null) {
+      for (SubProcessCommandLineArgs.Command s : commands.getParameters()) {
+        builder.command().add(s.ordinalPosition + 2, s.value);
+      }
+    }
+
+    builder.redirectError(Redirect.appendTo(outPutFiles.errFile.toFile()));
+    builder.redirectOutput(Redirect.appendTo(outPutFiles.outFile.toFile()));
+
+    return builder;
+  }
+
+  /**
+   * Add up the total bytes used by the process.
+   * @param commands
+   * @return
+   */
+  private int getTotalCommandBytes(SubProcessCommandLineArgs commands) {
+    int size = 0;
+    for (SubProcessCommandLineArgs.Command c : commands.getParameters()) {
+      size += c.value.length();
+    }
+    return size;
+  }
+
+  private Process execBinary(ProcessBuilder builder, SubProcessCommandLineArgs 
commands,
+      SubProcessIOFiles outPutFiles) throws Exception {
+    try {
+
+      builder = prepareBuilder(builder, commands, outPutFiles);
+      Process process = builder.start();
+
+      boolean timeout = !process.waitFor(configuration.getWaitTime(), 
TimeUnit.SECONDS);
+
+      if (timeout) {
+        String log = String.format(
+            "Timeout waiting to run process with parameters %s . "
+                + "Check to see if your timeout is long enough. Currently set 
at %s.",
+            createLogEntryFromInputs(builder.command()), 
configuration.getWaitTime());
+        throw new Exception(log);
+      }
+      return process;
+
+    } catch (Exception ex) {
+
+      LOG.error(String.format("Error running process with parameters %s error 
was %s ",
+          createLogEntryFromInputs(builder.command()), ex.getMessage()));
+      throw new Exception(ex);
+
+    }
+  }
+
+  /**
+   * TODO clean up duplicate with byte[] version collectBinaryProcessResults.
+   * @param process
+   * @param builder
+   * @param outPutFiles
+   * @return List of results
+   * @throws Exception if process has non 0 value or no logs found then throw 
exception
+   */
+  private List<String> collectProcessResults(Process process, ProcessBuilder 
builder,
+      SubProcessIOFiles outPutFiles) throws Exception {
+
+    List<String> results = new ArrayList<>();
+
+    try {
+
+      LOG.debug(String.format("Executing process %s", 
createLogEntryFromInputs(builder.command())));
+
+      // If process exit value is not 0 then subprocess failed, record logs
+      if (process.exitValue() != 0) {
+        outPutFiles.copyOutPutFilesToBucket(configuration, 
FileUtils.toStringParams(builder));
+        String log = createLogEntryForProcessFailure(process, 
builder.command(), outPutFiles);
+        throw new Exception(log);
+      }
+
+      // If no return file then either something went wrong or the binary is 
setup incorrectly for
+      // the ret file either way throw error
+      if (!Files.exists(outPutFiles.resultFile)) {
+        String log = createLogEntryForProcessFailure(process, 
builder.command(), outPutFiles);
+        outPutFiles.copyOutPutFilesToBucket(configuration, 
FileUtils.toStringParams(builder));
+        throw new Exception(log);
+      }
+
+      // Everything looks healthy return values
+      try (Stream<String> lines = Files.lines(outPutFiles.resultFile)) {
+        for (String line : (Iterable<String>) lines::iterator) {
+          results.add(line);
+        }
+      }
+      return results;
+    } catch (Exception ex) {
+      String log = String.format("Unexpected error runnng process. %s error 
message was %s",
+          createLogEntryFromInputs(builder.command()), ex.getMessage());
+      throw new Exception(log);
+    }
+  }
+
+  /**
+   * Used when the reault file contains binary data.
+   * @param process
+   * @param builder
+   * @param outPutFiles
+   * @return Binary results
+   * @throws Exception if process has non 0 value or no logs found then throw 
exception
+   */
+  private byte[] collectProcessResultsBytes(Process process, ProcessBuilder 
builder,
+      SubProcessIOFiles outPutFiles) throws Exception {
+
+    Byte[] results;
+
+    try {
+
+      LOG.debug(String.format("Executing process %s", 
createLogEntryFromInputs(builder.command())));
+
+      // If process exit value is not 0 then subprocess failed, record logs
+      if (process.exitValue() != 0) {
+        outPutFiles.copyOutPutFilesToBucket(configuration, 
FileUtils.toStringParams(builder));
+        String log = createLogEntryForProcessFailure(process, 
builder.command(), outPutFiles);
+        throw new Exception(log);
+      }
+
+      // If no return file then either something went wrong or the binary is 
setup incorrectly for
+      // the ret file either way throw error
+      if (!Files.exists(outPutFiles.resultFile)) {
+        String log = createLogEntryForProcessFailure(process, 
builder.command(), outPutFiles);
+        outPutFiles.copyOutPutFilesToBucket(configuration, 
FileUtils.toStringParams(builder));
+        throw new Exception(log);
+      }
+
+      // Everything looks healthy return bytes
+      return Files.readAllBytes(outPutFiles.resultFile);
+
+    } catch (Exception ex) {
+      String log = String.format("Unexpected error runnng process. %s error 
message was %s",
+          createLogEntryFromInputs(builder.command()), ex.getMessage());
+      throw new Exception(log);
+    }
+  }
+
+  private static String createLogEntryForProcessFailure(Process process, 
List<String> commands,
+      SubProcessIOFiles files) {
+
+    StringBuilder stringBuilder = new StringBuilder();
+
+    // Highlight when no result file is found vs standard process error
+    if (process.exitValue() == 0) {
+      stringBuilder.append(
+          String.format("%nProcess succeded but no result file was found %n"));
+    } else {
+      stringBuilder.append(
+          String.format("%nProcess error failed with exit value of %s %n", 
process.exitValue()));
+    }
+
+    stringBuilder
+        .append(String.format("Command info was %s %n", 
createLogEntryFromInputs(commands)));
+
+    stringBuilder.append(String.format("First line of error file is  %s %n",
+        FileUtils.readLineOfLogFile(files.errFile)));
+
+    stringBuilder.append(String.format("First line of out file is %s %n",
+        FileUtils.readLineOfLogFile(files.outFile)));
+
+    stringBuilder.append(String.format("First line of ret file is %s %n",
+        FileUtils.readLineOfLogFile(files.resultFile)));
+
+    return stringBuilder.toString();
+
+  }
+
+  private static String createLogEntryFromInputs(List<String> commands) {
+    String params;
+    if (commands != null) {
+      params = String.join(",", commands);
+    } else {
+      params = "No-Commands";
+    }
+    return params;
+  }
+
+  // Pass the Path of the binary to the SubProcess in Command position 0
+  private ProcessBuilder appendExecutablePath(ProcessBuilder builder) {
+    String executable = builder.command().get(0);
+    if (executable == null) {
+      throw new IllegalArgumentException(
+          "No executable provided to the Process Builder... we will do... 
nothing... ");
+    }
+    builder.command().set(0,
+        FileUtils.getFileResourceId(configuration.getWorkerPath(), 
executable).toString());
+    return builder;
+  }
+}
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/CallingSubProcessUtils.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/CallingSubProcessUtils.java
new file mode 100644
index 00000000000..49e3ab7839b
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/CallingSubProcessUtils.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.utils;
+
+import com.google.common.collect.Sets;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+import 
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility class for dealing with concurrency and binary file copies to the 
worker.
+ */
+public class CallingSubProcessUtils {
+
+  // Prevent Instantiation
+  private CallingSubProcessUtils() {}
+
+  static final Logger LOG = 
LoggerFactory.getLogger(CallingSubProcessUtils.class);
+
+  static boolean initCompleted = false;
+
+  // Allow multiple subclasses to create files, but only one thread per 
subclass can add the file to
+  // the worker
+  private static final Set<String> downloadedFiles = 
Sets.<String>newConcurrentHashSet();
+
+  // Limit the number of threads able to do work
+  private static Map<String, Semaphore> semaphores = new 
ConcurrentHashMap<String, Semaphore>();
+
+  public static void setUp(SubProcessConfiguration configuration, String 
binaryName)
+      throws Exception {
+
+    if (!semaphores.containsKey(binaryName)) {
+      initSemaphore(configuration.getConcurrency(), binaryName);
+    }
+
+    synchronized (downloadedFiles) {
+      if (!downloadedFiles.contains(binaryName)) {
+        // Create Directories if needed
+        FileUtils.createDirectoriesOnWorker(configuration);
+        LOG.info("Calling filesetup to move Executables to worker.");
+        ExecutableFile executableFile = new ExecutableFile(configuration, 
binaryName);
+        FileUtils.copyFileFromGCSToWorker(executableFile);
+        downloadedFiles.add(binaryName);
+      }
+    }
+  }
+
+  public static synchronized void initSemaphore(Integer permits, String 
binaryName) {
+    if (!semaphores.containsKey(binaryName)) {
+      LOG.info(String.format(String.format("Initialized Semaphore for binary 
%s ",  binaryName)));
+      semaphores.put(binaryName, new Semaphore(permits));
+    }
+  }
+
+  private static void aquireSemaphore(String binaryName) throws 
IllegalStateException {
+    if  (!semaphores.containsKey(binaryName)) {
+      throw new IllegalStateException("Semaphore is NULL, check init logic in 
@Setup.");
+    }
+    try {
+      semaphores.get(binaryName).acquire();
+    } catch (InterruptedException ex) {
+      LOG.error("Interupted during aquire", ex);
+    }
+  }
+
+  private static void releaseSemaphore(String binaryName) throws 
IllegalStateException {
+    if  (!semaphores.containsKey(binaryName)) {
+      throw new IllegalStateException("Semaphore is NULL, check init logic in 
@Setup.");
+    }
+    semaphores.get(binaryName).release();
+  }
+
+  /**
+   * Permit class for access to worker cpu resources.
+   */
+  public static class Permit implements AutoCloseable {
+
+    private String binaryName;
+
+    public Permit(String binaryName){
+      this.binaryName = binaryName;
+      CallingSubProcessUtils.aquireSemaphore(binaryName);
+    }
+
+    @Override
+    public void close() {
+      CallingSubProcessUtils.releaseSemaphore(binaryName);
+    }
+
+  }
+}
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/ExecutableFile.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/ExecutableFile.java
new file mode 100644
index 00000000000..a5a789250cb
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/ExecutableFile.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.utils;
+
+import 
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.sdk.coders.AvroCoder;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains the configuration for the external library.
+ */
+@DefaultCoder(AvroCoder.class)
+public class ExecutableFile {
+
+  String fileName;
+
+  private String sourceGCSLocation;
+  private String destinationLocation;
+
+  static final Logger LOG = LoggerFactory.getLogger(ExecutableFile.class);
+
+  public String getSourceGCSLocation() {
+    return sourceGCSLocation;
+  }
+
+  public void setSourceGCSLocation(String sourceGCSLocation) {
+    this.sourceGCSLocation = sourceGCSLocation;
+  }
+
+  public String getDestinationLocation() {
+    return destinationLocation;
+  }
+
+  public void setDestinationLocation(String destinationLocation) {
+    this.destinationLocation = destinationLocation;
+  }
+
+  public ExecutableFile(SubProcessConfiguration configuration, String fileName)
+      throws IllegalStateException {
+    if (configuration == null) {
+      throw new IllegalStateException("Configuration can not be NULL");
+    }
+    if (fileName == null) {
+      throw new IllegalStateException("FileName can not be NULLt");
+    }
+    this.fileName = fileName;
+    setDestinationLocation(configuration);
+    setSourceLocation(configuration);
+  }
+
+  private void setDestinationLocation(SubProcessConfiguration configuration) {
+    this.sourceGCSLocation =
+        FileUtils.getFileResourceId(configuration.getSourcePath(), 
fileName).toString();
+  }
+
+  private void setSourceLocation(SubProcessConfiguration configuration) {
+    this.destinationLocation =
+        FileUtils.getFileResourceId(configuration.getWorkerPath(), 
fileName).toString();
+  }
+}
diff --git 
a/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/FileUtils.java
 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/FileUtils.java
new file mode 100644
index 00000000000..07e118b938d
--- /dev/null
+++ 
b/examples/java/src/main/java/org/apache/beam/examples/subprocess/utils/FileUtils.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess.utils;
+
+import java.io.BufferedReader;
+import java.io.FileNotFoundException;
+import java.io.FileReader;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.FileAlreadyExistsException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import 
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utilities for dealing with movement of files from object stores and workers.
+ */
+public class FileUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(FileUtils.class);
+
+  public static ResourceId getFileResourceId(String directory, String 
fileName) {
+    ResourceId resourceID = FileSystems.matchNewResource(directory, true);
+    return resourceID.getCurrentDirectory().resolve(fileName, 
StandardResolveOptions.RESOLVE_FILE);
+  }
+
+  public static String toStringParams(ProcessBuilder builder) {
+    return (String.join(",", builder.command()));
+  }
+
+  public static String copyFileFromWorkerToGCS(SubProcessConfiguration 
configuration,
+      Path fileToUpload) throws Exception {
+
+    Path fileName;
+
+    if ((fileName = fileToUpload.getFileName()) == null) {
+      throw new IllegalArgumentException("FileName can not be null.");
+    }
+
+    ResourceId sourceFile = getFileResourceId(configuration.getWorkerPath(), 
fileName.toString());
+
+    LOG.info("Copying file from worker " + sourceFile);
+
+    ResourceId destinationFile =
+        getFileResourceId(configuration.getSourcePath(), fileName.toString());
+    // TODO currently not supported with different schemas for example GCS to 
local, else could use
+    // FileSystems.copy(ImmutableList.of(sourceFile), 
ImmutableList.of(destinationFile));
+    try {
+      return copyFile(sourceFile, destinationFile);
+    } catch (Exception ex) {
+      LOG.error(String.format("Error copying file from %s  to %s", sourceFile, 
destinationFile),
+          ex);
+      throw ex;
+    }
+  }
+
+  public static String copyFileFromGCSToWorker(ExecutableFile execuableFile) 
throws Exception {
+
+    ResourceId sourceFile =
+        FileSystems.matchNewResource(execuableFile.getSourceGCSLocation(), 
false);
+    ResourceId destinationFile =
+        FileSystems.matchNewResource(execuableFile.getDestinationLocation(), 
false);
+    try {
+      LOG.info(String.format("Moving File %s to %s ", 
execuableFile.getSourceGCSLocation(),
+          execuableFile.getDestinationLocation()));
+      Path path = Paths.get(execuableFile.getDestinationLocation());
+
+      if (path.toFile().exists()) {
+        LOG.warn(String.format(
+            "Overwriting file %s, should only see this once per worker.",
+            execuableFile.getDestinationLocation()));
+      }
+      copyFile(sourceFile, destinationFile);
+      path.toFile().setExecutable(true);
+      return path.toString();
+
+    } catch (Exception ex) {
+      LOG.error(String.format("Error moving file : %s ", 
execuableFile.fileName), ex);
+      throw ex;
+    }
+  }
+
+  public static String copyFile(ResourceId sourceFile, ResourceId 
destinationFile)
+      throws IOException {
+
+    try (WritableByteChannel writeChannel = 
FileSystems.create(destinationFile, "text/plain")) {
+      try (ReadableByteChannel readChannel = FileSystems.open(sourceFile)) {
+
+        final ByteBuffer buffer = ByteBuffer.allocateDirect(16 * 1024);
+        while (readChannel.read(buffer) != -1) {
+          buffer.flip();
+          writeChannel.write(buffer);
+          buffer.compact();
+        }
+        buffer.flip();
+        while (buffer.hasRemaining()) {
+          writeChannel.write(buffer);
+        }
+      }
+    }
+
+    return destinationFile.toString();
+  }
+
+  /**
+   * Create directories needed based on configuration.
+   * @param configuration
+   * @throws IOException
+   */
+  public static void createDirectoriesOnWorker(SubProcessConfiguration 
configuration)
+      throws IOException {
+
+    try {
+
+      Path path = Paths.get(configuration.getWorkerPath());
+
+      if (!path.toFile().exists()) {
+        Files.createDirectories(path);
+        LOG.info(String.format("Created Folder %s ", path.toFile()));
+      }
+    } catch (FileAlreadyExistsException ex) {
+      LOG.warn(String.format(
+          " Tried to create folder %s which already existsed, this should not 
happen!",
+          configuration.getWorkerPath()), ex);
+    }
+  }
+
+  public static String readLineOfLogFile(Path path) {
+
+    try (BufferedReader br = new BufferedReader(new 
FileReader(path.toString()))) {
+      return br.readLine();
+    } catch (FileNotFoundException e) {
+      LOG.error("Error reading the first line of file", e);
+    } catch (IOException e) {
+      LOG.error("Error reading the first line of file", e);
+    }
+
+    // `return empty string rather than NULL string as this data is often used 
in further logging
+    return "";
+  }
+}
diff --git 
a/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
 
b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
new file mode 100644
index 00000000000..4a9bec7380a
--- /dev/null
+++ 
b/examples/java/src/test/java/org/apache/beam/examples/subprocess/ExampleEchoPipelineTest.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.subprocess;
+
+import com.google.common.collect.ImmutableList;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.List;
+import 
org.apache.beam.examples.subprocess.configuration.SubProcessConfiguration;
+import org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs;
+import 
org.apache.beam.examples.subprocess.kernel.SubProcessCommandLineArgs.Command;
+import org.apache.beam.examples.subprocess.kernel.SubProcessKernel;
+import org.apache.beam.examples.subprocess.utils.CallingSubProcessUtils;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.GcsUtil;
+import org.apache.beam.sdk.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * To keep {@link org.apache.beam.examples.subprocess.ExampleEchoPipeline} 
simple,
+ * it is not factored or testable. This test file should be maintained with a 
copy of its
+ * code for a basic smoke test.
+ **/
+public class ExampleEchoPipelineTest {
+
+  static final Logger LOG = 
LoggerFactory.getLogger(ExampleEchoPipelineTest.class);
+
+  @Rule public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
+
+  @Test public void testExampleEchoPipeline() throws Exception {
+
+    // Create two Bash files as tests for the binary files
+
+    Path fileA = Files.createTempFile("test-Echo", ".sh");
+    Path fileB = Files.createTempFile("test-EchoAgain", ".sh");
+
+    Path workerTempFiles = Files.createTempDirectory("test-Echoo");
+
+    try (SeekableByteChannel channel =
+        FileChannel.open(fileA, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)) {
+      channel.write(ByteBuffer.wrap(getTestShellEcho().getBytes()));
+    }
+
+    try (SeekableByteChannel channel =
+        FileChannel.open(fileB, StandardOpenOption.CREATE, 
StandardOpenOption.WRITE)) {
+        channel.write(ByteBuffer.wrap(getTestShellEchoAgain().getBytes()));
+    }
+
+
+    // Read in the options for the pipeline
+    SubProcessPipelineOptions options = PipelineOptionsFactory
+        .as(SubProcessPipelineOptions.class);
+
+    options.setConcurrency(2);
+    options.setSourcePath(fileA.getParent().toString());
+    options.setWorkerPath(workerTempFiles.toAbsolutePath().toString());
+
+    p.getOptions().as(GcsOptions.class).setGcsUtil(buildMockGcsUtil());
+
+    // Setup the Configuration option used with all transforms
+    SubProcessConfiguration configuration = 
options.getSubProcessConfiguration();
+
+    // Create some sample data to be fed to our c++ Echo library
+    List<KV<String, String>> sampleData = new ArrayList<>();
+
+    for (int i = 0; i < 100; i++) {
+      String str = String.valueOf(i);
+      sampleData.add(KV.of(str, str));
+    }
+
+    // Define the pipeline which is two transforms echoing the inputs out to 
Logs
+    // For this use case we will make use of two shell files instead of the 
binary to make
+    // testing easier
+    PCollection<KV<String, String>> output = p.apply(Create.of(sampleData))
+        .apply("Echo inputs round 1",
+            ParDo.of(new EchoInputDoFn(configuration, 
fileA.getFileName().toString())))
+        .apply("Echo inputs round 2",
+            ParDo.of(new EchoInputDoFn(configuration, 
fileB.getFileName().toString())));
+
+    PAssert.that(output).containsInAnyOrder(sampleData);
+
+    p.run();
+
+
+  }
+
+  /**
+   * Simple DoFn that echos the element, used as an example of running a C++ 
library.
+   */
+  @SuppressWarnings("serial") @RunWith(JUnit4.class) public static class 
EchoInputDoFn
+      extends DoFn<KV<String, String>, KV<String, String>> {
+
+    static final Logger LOG = LoggerFactory.getLogger(EchoInputDoFn.class);
+
+    private SubProcessConfiguration configuration;
+    private String binaryName;
+
+    public EchoInputDoFn(SubProcessConfiguration configuration, String binary) 
{
+      // Pass in configuration information the name of the filename of the 
sub-process and the level
+      // of concurrency
+      this.configuration = configuration;
+      this.binaryName = binary;
+    }
+
+    @Setup public void setUp() throws Exception {
+      CallingSubProcessUtils.setUp(configuration, binaryName);
+    }
+
+    @ProcessElement public void processElement(ProcessContext c) throws 
Exception {
+      try {
+        // Our Library takes a single command in position 0 which it will echo 
back in the result
+        SubProcessCommandLineArgs commands = new SubProcessCommandLineArgs();
+        Command command = new Command(0, 
String.valueOf(c.element().getValue()));
+        commands.putCommand(command);
+
+        // The ProcessingKernel deals with the execution of the process
+        SubProcessKernel kernel = new SubProcessKernel(configuration, 
binaryName);
+
+        // Run the command and work through the results
+        List<String> results = kernel.exec(commands);
+        for (String s : results) {
+          c.output(KV.of(c.element().getKey(), s));
+        }
+      } catch (Exception ex) {
+        LOG.error("Error processing element ", ex);
+        throw ex;
+      }
+    }
+  }
+
+  private static String getTestShellEcho(){
+    return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;";
+  }
+
+  private static String getTestShellEchoAgain(){
+    return "#!/bin/sh\n" + "filename=$1;\n" + "echo $2 >> $filename;";
+  }
+
+  private GcsUtil buildMockGcsUtil() throws IOException {
+    GcsUtil mockGcsUtil = Mockito.mock(GcsUtil.class);
+
+    // Any request to open gets a new bogus channel
+    Mockito.when(mockGcsUtil.open(Mockito.any(GcsPath.class)))
+        .then(new Answer<SeekableByteChannel>() {
+
+          @Override public SeekableByteChannel answer(InvocationOnMock 
invocation)
+              throws Throwable {
+            return FileChannel.open(Files.createTempFile("channel-", ".tmp"),
+                StandardOpenOption.CREATE,
+                StandardOpenOption.DELETE_ON_CLOSE);
+          }
+        });
+
+    // Any request for expansion returns a list containing the original GcsPath
+    // This is required to pass validation that occurs in TextIO during apply()
+    Mockito.when(mockGcsUtil.expand(Mockito.any(GcsPath.class))).then(new 
Answer<List<GcsPath>>() {
+
+      @Override public List<GcsPath> answer(InvocationOnMock invocation) 
throws Throwable {
+        return ImmutableList.of((GcsPath) invocation.getArguments()[0]);
+      }
+    });
+
+    return mockGcsUtil;
+  }
+
+    }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 90002)
    Time Spent: 40m  (was: 0.5h)

> Add examples of running external libraries on workers
> -----------------------------------------------------
>
>                 Key: BEAM-2871
>                 URL: https://issues.apache.org/jira/browse/BEAM-2871
>             Project: Beam
>          Issue Type: New Feature
>          Components: examples-java
>    Affects Versions: 2.1.0
>            Reporter: Reza ardeshir rokni
>            Assignee: Reuven Lax
>            Priority: Major
>          Time Spent: 40m
>  Remaining Estimate: 0h
>
> Provide example for how to run external libraries such as c++ code within a 
> DoFn. 
> More details of this pattern can be viewed in the blog:
> https://cloud.google.com/blog/big-data/2017/07/running-external-libraries-with-cloud-dataflow-for-grid-computing-workloads



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to