This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 81538672cfe Support class executes the Prism binary (#31795)
81538672cfe is described below

commit 81538672cfea253d4965063ac4ca12233203aa06
Author: Damon <[email protected]>
AuthorDate: Tue Jul 9 16:09:17 2024 -0700

    Support class executes the Prism binary (#31795)
    
    * Stage PrismRunner implementation and dependencies
    
    * A Java support class executes the Prism binary
    
    * Sync with head
    
    * Remove pid
---
 .../apache/beam/runners/prism/PrismExecutor.java   | 160 +++++++++++++++++++++
 .../beam/runners/prism/PrismExecutorTest.java      |  99 +++++++++++++
 2 files changed, 259 insertions(+)

diff --git 
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
 
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
new file mode 100644
index 00000000000..fba2eec99c5
--- /dev/null
+++ 
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import com.google.auto.value.AutoValue;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.ByteStreams;
+import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * {@link PrismExecutor} builds and executes a {@link ProcessBuilder} for use 
by the {@link
+ * PrismRunner}. Prism is a {@link 
org.apache.beam.runners.portability.PortableRunner} maintained at
+ * <a 
href="https://github.com/apache/beam/tree/master/sdks/go/cmd/prism";>sdks/go/cmd/prism</a>.
+ */
+@AutoValue
+abstract class PrismExecutor {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(PrismExecutor.class);
+
+  protected @MonotonicNonNull Process process;
+  protected ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+  protected @MonotonicNonNull Future<?> future = null;
+
+  static Builder builder() {
+    return new AutoValue_PrismExecutor.Builder();
+  }
+
+  /** The command to execute the Prism binary. */
+  abstract String getCommand();
+
+  /**
+   * Additional arguments to pass when invoking the Prism binary. Defaults to 
an {@link
+   * Collections#emptyList()}.
+   */
+  abstract List<String> getArguments();
+
+  /** Stops the execution of the {@link Process}, created as a result of 
{@link #execute}. */
+  void stop() {
+    LOG.info("Stopping Prism...");
+    if (future != null) {
+      future.cancel(true);
+    }
+    executorService.shutdown();
+    try {
+      boolean ignored = executorService.awaitTermination(1000L, 
TimeUnit.MILLISECONDS);
+    } catch (InterruptedException ignored) {
+    }
+    if (process == null) {
+      return;
+    }
+    if (!process.isAlive()) {
+      return;
+    }
+    process.destroy();
+    try {
+      process.waitFor();
+    } catch (InterruptedException ignored) {
+    }
+  }
+
+  /**
+   * Execute the {@link ProcessBuilder} that starts the Prism service. 
Redirects output to STDOUT.
+   */
+  void execute() throws IOException {
+    execute(createProcessBuilder().inheritIO());
+  }
+
+  /**
+   * Execute the {@link ProcessBuilder} that starts the Prism service. 
Redirects output to the
+   * {@param outputStream}.
+   */
+  void execute(OutputStream outputStream) throws IOException {
+    execute(createProcessBuilder().redirectErrorStream(true));
+    this.future =
+        executorService.submit(
+            () -> {
+              try {
+                ByteStreams.copy(checkStateNotNull(process).getInputStream(), 
outputStream);
+              } catch (IOException e) {
+                throw new RuntimeException(e);
+              }
+            });
+  }
+
+  /**
+   * Execute the {@link ProcessBuilder} that starts the Prism service. 
Redirects output to the
+   * {@param file}.
+   */
+  void execute(File file) throws IOException {
+    execute(
+        createProcessBuilder()
+            .redirectErrorStream(true)
+            .redirectOutput(ProcessBuilder.Redirect.appendTo(file)));
+  }
+
+  private void execute(ProcessBuilder processBuilder) throws IOException {
+    this.process = processBuilder.start();
+    LOG.info("started {}", String.join(" ", getCommandWithArguments()));
+  }
+
+  private List<String> getCommandWithArguments() {
+    List<String> commandWithArguments = new ArrayList<>();
+    commandWithArguments.add(getCommand());
+    commandWithArguments.addAll(getArguments());
+
+    return commandWithArguments;
+  }
+
+  private ProcessBuilder createProcessBuilder() {
+    return new ProcessBuilder(getCommandWithArguments());
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+
+    abstract Builder setCommand(String command);
+
+    abstract Builder setArguments(List<String> arguments);
+
+    abstract Optional<List<String>> getArguments();
+
+    abstract PrismExecutor autoBuild();
+
+    final PrismExecutor build() {
+      if (!getArguments().isPresent()) {
+        setArguments(Collections.emptyList());
+      }
+      return autoBuild();
+    }
+  }
+}
diff --git 
a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java
 
b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java
new file mode 100644
index 00000000000..315e585a0c5
--- /dev/null
+++ 
b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismExecutorTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import static com.google.common.truth.Truth.assertThat;
+import static 
org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.Collections;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PrismExecutor}. */
+@RunWith(JUnit4.class)
+public class PrismExecutorTest {
+  @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder();
+
+  @Rule public TestName testName = new TestName();
+
+  @Test
+  public void executeThenStop() throws IOException {
+    PrismExecutor executor = underTest().build();
+    executor.execute();
+    sleep(3000L);
+    executor.stop();
+  }
+
+  @Test
+  public void executeWithStreamRedirectThenStop() throws IOException {
+    PrismExecutor executor = underTest().build();
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    executor.execute(outputStream);
+    sleep(3000L);
+    executor.stop();
+    String output = outputStream.toString(StandardCharsets.UTF_8.name());
+    assertThat(output).contains("INFO Serving JobManagement 
endpoint=localhost:8073");
+  }
+
+  @Test
+  public void executeWithFileOutputThenStop() throws IOException {
+    PrismExecutor executor = underTest().build();
+    File log = temporaryFolder.newFile(testName.getMethodName());
+    executor.execute(log);
+    sleep(3000L);
+    executor.stop();
+    try (Stream<String> stream = Files.lines(log.toPath(), 
StandardCharsets.UTF_8)) {
+      String output = stream.collect(Collectors.joining("\n"));
+      assertThat(output).contains("INFO Serving JobManagement 
endpoint=localhost:8073");
+    }
+  }
+
+  @Test
+  public void executeWithCustomArgumentsThenStop() throws IOException {
+    PrismExecutor executor =
+        
underTest().setArguments(Collections.singletonList("-job_port=5555")).build();
+    ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+    executor.execute(outputStream);
+    sleep(3000L);
+    executor.stop();
+    String output = outputStream.toString(StandardCharsets.UTF_8.name());
+    assertThat(output).contains("INFO Serving JobManagement 
endpoint=localhost:5555");
+  }
+
+  private PrismExecutor.Builder underTest() {
+    return 
PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest());
+  }
+
+  private void sleep(long millis) {
+    try {
+      Thread.sleep(millis);
+    } catch (InterruptedException ignored) {
+    }
+  }
+}

Reply via email to