This is an automated email from the ASF dual-hosted git repository.

damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new bdd5fff78c8 [Prism] Implement PrismPipelineResult (#31937)
bdd5fff78c8 is described below

commit bdd5fff78c84e45e6cc95d9dc4a1871bc39cf20f
Author: Damon <[email protected]>
AuthorDate: Fri Jul 19 15:22:25 2024 -0700

    [Prism] Implement PrismPipelineResult (#31937)
    
    * Implement PrismPipelineResult
    
    * Add isAlive checks
---
 runners/prism/java/build.gradle                    |   2 +
 .../apache/beam/runners/prism/PrismExecutor.java   |   8 ++
 .../beam/runners/prism/PrismPipelineResult.java    | 109 +++++++++++++++++
 .../runners/prism/PrismPipelineResultTest.java     | 130 +++++++++++++++++++++
 4 files changed, 249 insertions(+)

diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index dfc863e8f63..93d151f3e05 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -29,10 +29,12 @@ dependencies {
     implementation project(path: ":sdks:java:core", configuration: "shadow")
     implementation project(":runners:portability:java")
 
+    implementation library.java.joda_time
     implementation library.java.slf4j_api
     implementation library.java.vendored_guava_32_1_2_jre
 
     testImplementation library.java.junit
+    testImplementation library.java.mockito_core
     testImplementation library.java.truth
 }
 
diff --git 
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
 
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
index fba2eec99c5..620d5508f22 100644
--- 
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
+++ 
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
@@ -87,6 +87,14 @@ abstract class PrismExecutor {
     }
   }
 
+  /** Reports whether the Prism executable {@link Process#isAlive()}. */
+  boolean isAlive() {
+    if (process == null) {
+      return false;
+    }
+    return process.isAlive();
+  }
+
   /**
    * Execute the {@link ProcessBuilder} that starts the Prism service. 
Redirects output to STDOUT.
    */
diff --git 
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java
 
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java
new file mode 100644
index 00000000000..a551196c9b6
--- /dev/null
+++ 
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * The {@link PipelineResult} of executing a {@link 
org.apache.beam.sdk.Pipeline} using the {@link
+ * PrismRunner} and an internal {@link PipelineResult} delegate.
+ */
+class PrismPipelineResult implements PipelineResult {
+
+  static PrismPipelineResult of(PipelineResult delegate, PrismExecutor 
executor) {
+    return new PrismPipelineResult(delegate, executor::stop);
+  }
+
+  private final PipelineResult delegate;
+  private final Runnable cancel;
+  private @Nullable MetricResults terminalMetrics;
+  private @Nullable State terminalState;
+
+  /**
+   * Instantiate the {@link PipelineResult} from the {@param delegate} and a 
{@param cancel} to be
+   * called when stopping the underlying executable Job management service.
+   */
+  PrismPipelineResult(PipelineResult delegate, Runnable cancel) {
+    this.delegate = delegate;
+    this.cancel = cancel;
+  }
+
+  /** Forwards the result of the delegate {@link PipelineResult#getState}. */
+  @Override
+  public State getState() {
+    if (terminalState != null) {
+      return terminalState;
+    }
+    return delegate.getState();
+  }
+
+  /**
+   * Forwards the result of the delegate {@link PipelineResult#cancel}. 
Invokes {@link
+   * PrismExecutor#stop()} before returning the resulting {@link
+   * org.apache.beam.sdk.PipelineResult.State}.
+   */
+  @Override
+  public State cancel() throws IOException {
+    State state = delegate.cancel();
+    this.terminalMetrics = delegate.metrics();
+    this.terminalState = state;
+    this.cancel.run();
+    return state;
+  }
+
+  /**
+   * Forwards the result of the delegate {@link 
PipelineResult#waitUntilFinish(Duration)}. Invokes
+   * {@link PrismExecutor#stop()} before returning the resulting {@link
+   * org.apache.beam.sdk.PipelineResult.State}.
+   */
+  @Override
+  public State waitUntilFinish(Duration duration) {
+    State state = delegate.waitUntilFinish(duration);
+    this.terminalMetrics = delegate.metrics();
+    this.terminalState = state;
+    this.cancel.run();
+    return state;
+  }
+
+  /**
+   * Forwards the result of the delegate {@link 
PipelineResult#waitUntilFinish}. Invokes {@link
+   * PrismExecutor#stop()} before returning the resulting {@link
+   * org.apache.beam.sdk.PipelineResult.State}.
+   */
+  @Override
+  public State waitUntilFinish() {
+    State state = delegate.waitUntilFinish();
+    this.terminalMetrics = delegate.metrics();
+    this.terminalState = state;
+    this.cancel.run();
+    return state;
+  }
+
+  /** Forwards the result of the delegate {@link PipelineResult#metrics}. */
+  @Override
+  public MetricResults metrics() {
+    if (terminalMetrics != null) {
+      return terminalMetrics;
+    }
+    return delegate.metrics();
+  }
+}
diff --git 
a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java
 
b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java
new file mode 100644
index 00000000000..2ad7e2eb3dd
--- /dev/null
+++ 
b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import static com.google.common.truth.Truth.assertThat;
+import static 
org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PrismPipelineResult}. */
+@RunWith(JUnit4.class)
+public class PrismPipelineResultTest {
+
+  final PrismExecutor exec = executor();
+
+  @Before
+  public void setUp() throws IOException {
+    exec.execute();
+    assertThat(exec.isAlive()).isTrue();
+  }
+
+  @After
+  public void tearDown() {
+    assertThat(exec.isAlive()).isFalse();
+  }
+
+  @Test
+  public void givenTerminated_reportsState() {
+    PipelineResult delegate = mock(PipelineResult.class);
+    when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.FAILED);
+    PrismPipelineResult underTest = new PrismPipelineResult(delegate, 
exec::stop);
+    // Assigns terminal state.
+    underTest.waitUntilFinish();
+    assertThat(underTest.getState()).isEqualTo(PipelineResult.State.FAILED);
+  }
+
+  @Test
+  public void givenNotTerminated_reportsState() {
+    PipelineResult delegate = mock(PipelineResult.class);
+    when(delegate.getState()).thenReturn(PipelineResult.State.RUNNING);
+    PrismPipelineResult underTest = new PrismPipelineResult(delegate, 
exec::stop);
+    assertThat(underTest.getState()).isEqualTo(PipelineResult.State.RUNNING);
+    exec.stop();
+  }
+
+  @Test
+  public void cancelStopsExecutable_reportsTerminalState() throws IOException {
+    PipelineResult delegate = mock(PipelineResult.class);
+    when(delegate.cancel()).thenReturn(PipelineResult.State.CANCELLED);
+    PrismPipelineResult underTest = new PrismPipelineResult(delegate, 
exec::stop);
+    assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.CANCELLED);
+  }
+
+  @Test
+  public void givenTerminated_cancelIsNoop_reportsTerminalState() throws 
IOException {
+    PipelineResult delegate = mock(PipelineResult.class);
+    when(delegate.cancel()).thenReturn(PipelineResult.State.FAILED);
+    PrismPipelineResult underTest = new PrismPipelineResult(delegate, 
exec::stop);
+    assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.FAILED);
+  }
+
+  @Test
+  public void 
givenPipelineRunWithDuration_waitUntilFinish_reportsTerminalState() {
+    PipelineResult delegate = mock(PipelineResult.class);
+    when(delegate.waitUntilFinish(Duration.millis(3000L)))
+        .thenReturn(PipelineResult.State.CANCELLED);
+    PrismPipelineResult underTest = new PrismPipelineResult(delegate, 
exec::stop);
+    assertThat(underTest.waitUntilFinish(Duration.millis(3000L)))
+        .isEqualTo(PipelineResult.State.CANCELLED);
+  }
+
+  @Test
+  public void givenTerminated_waitUntilFinishIsNoop_reportsTerminalState() {
+    PipelineResult delegate = mock(PipelineResult.class);
+    when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE);
+    PrismPipelineResult underTest = new PrismPipelineResult(delegate, 
exec::stop);
+    // Terminate Job as setup for additional call.
+    underTest.waitUntilFinish();
+    
assertThat(underTest.waitUntilFinish()).isEqualTo(PipelineResult.State.DONE);
+  }
+
+  @Test
+  public void givenNotTerminated_reportsMetrics() {
+    PipelineResult delegate = mock(PipelineResult.class);
+    when(delegate.metrics()).thenReturn(mock(MetricResults.class));
+    PrismPipelineResult underTest = new PrismPipelineResult(delegate, 
exec::stop);
+    assertThat(underTest.metrics()).isNotNull();
+    exec.stop();
+  }
+
+  @Test
+  public void givenTerminated_reportsTerminatedMetrics() {
+    PipelineResult delegate = mock(PipelineResult.class);
+    when(delegate.metrics()).thenReturn(mock(MetricResults.class));
+    when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE);
+    PrismPipelineResult underTest = new PrismPipelineResult(delegate, 
exec::stop);
+    // Terminate Job as setup for additional call.
+    underTest.waitUntilFinish();
+    assertThat(underTest.metrics()).isNotNull();
+  }
+
+  private static PrismExecutor executor() {
+    return 
PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest()).build();
+  }
+}

Reply via email to