This is an automated email from the ASF dual-hosted git repository.
damondouglas pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new bdd5fff78c8 [Prism] Implement PrismPipelineResult (#31937)
bdd5fff78c8 is described below
commit bdd5fff78c84e45e6cc95d9dc4a1871bc39cf20f
Author: Damon <[email protected]>
AuthorDate: Fri Jul 19 15:22:25 2024 -0700
[Prism] Implement PrismPipelineResult (#31937)
* Implement PrismPipelineResult
* Add isAlive checks
---
runners/prism/java/build.gradle | 2 +
.../apache/beam/runners/prism/PrismExecutor.java | 8 ++
.../beam/runners/prism/PrismPipelineResult.java | 109 +++++++++++++++++
.../runners/prism/PrismPipelineResultTest.java | 130 +++++++++++++++++++++
4 files changed, 249 insertions(+)
diff --git a/runners/prism/java/build.gradle b/runners/prism/java/build.gradle
index dfc863e8f63..93d151f3e05 100644
--- a/runners/prism/java/build.gradle
+++ b/runners/prism/java/build.gradle
@@ -29,10 +29,12 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(":runners:portability:java")
+ implementation library.java.joda_time
implementation library.java.slf4j_api
implementation library.java.vendored_guava_32_1_2_jre
testImplementation library.java.junit
+ testImplementation library.java.mockito_core
testImplementation library.java.truth
}
diff --git
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
index fba2eec99c5..620d5508f22 100644
---
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
+++
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismExecutor.java
@@ -87,6 +87,14 @@ abstract class PrismExecutor {
}
}
+ /** Reports whether the Prism executable {@link Process#isAlive()}. */
+ boolean isAlive() {
+ if (process == null) {
+ return false;
+ }
+ return process.isAlive();
+ }
+
/**
* Execute the {@link ProcessBuilder} that starts the Prism service.
Redirects output to STDOUT.
*/
diff --git
a/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java
new file mode 100644
index 00000000000..a551196c9b6
--- /dev/null
+++
b/runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismPipelineResult.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+
+/**
+ * The {@link PipelineResult} of executing a {@link
org.apache.beam.sdk.Pipeline} using the {@link
+ * PrismRunner} and an internal {@link PipelineResult} delegate.
+ */
+class PrismPipelineResult implements PipelineResult {
+
+ static PrismPipelineResult of(PipelineResult delegate, PrismExecutor
executor) {
+ return new PrismPipelineResult(delegate, executor::stop);
+ }
+
+ private final PipelineResult delegate;
+ private final Runnable cancel;
+ private @Nullable MetricResults terminalMetrics;
+ private @Nullable State terminalState;
+
+ /**
+ * Instantiate the {@link PipelineResult} from the {@param delegate} and a
{@param cancel} to be
+ * called when stopping the underlying executable Job management service.
+ */
+ PrismPipelineResult(PipelineResult delegate, Runnable cancel) {
+ this.delegate = delegate;
+ this.cancel = cancel;
+ }
+
+ /** Forwards the result of the delegate {@link PipelineResult#getState}. */
+ @Override
+ public State getState() {
+ if (terminalState != null) {
+ return terminalState;
+ }
+ return delegate.getState();
+ }
+
+ /**
+ * Forwards the result of the delegate {@link PipelineResult#cancel}.
Invokes {@link
+ * PrismExecutor#stop()} before returning the resulting {@link
+ * org.apache.beam.sdk.PipelineResult.State}.
+ */
+ @Override
+ public State cancel() throws IOException {
+ State state = delegate.cancel();
+ this.terminalMetrics = delegate.metrics();
+ this.terminalState = state;
+ this.cancel.run();
+ return state;
+ }
+
+ /**
+ * Forwards the result of the delegate {@link
PipelineResult#waitUntilFinish(Duration)}. Invokes
+ * {@link PrismExecutor#stop()} before returning the resulting {@link
+ * org.apache.beam.sdk.PipelineResult.State}.
+ */
+ @Override
+ public State waitUntilFinish(Duration duration) {
+ State state = delegate.waitUntilFinish(duration);
+ this.terminalMetrics = delegate.metrics();
+ this.terminalState = state;
+ this.cancel.run();
+ return state;
+ }
+
+ /**
+ * Forwards the result of the delegate {@link
PipelineResult#waitUntilFinish}. Invokes {@link
+ * PrismExecutor#stop()} before returning the resulting {@link
+ * org.apache.beam.sdk.PipelineResult.State}.
+ */
+ @Override
+ public State waitUntilFinish() {
+ State state = delegate.waitUntilFinish();
+ this.terminalMetrics = delegate.metrics();
+ this.terminalState = state;
+ this.cancel.run();
+ return state;
+ }
+
+ /** Forwards the result of the delegate {@link PipelineResult#metrics}. */
+ @Override
+ public MetricResults metrics() {
+ if (terminalMetrics != null) {
+ return terminalMetrics;
+ }
+ return delegate.metrics();
+ }
+}
diff --git
a/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java
b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java
new file mode 100644
index 00000000000..2ad7e2eb3dd
--- /dev/null
+++
b/runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismPipelineResultTest.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.prism;
+
+import static com.google.common.truth.Truth.assertThat;
+import static
org.apache.beam.runners.prism.PrismRunnerTest.getLocalPrismBuildOrIgnoreTest;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.joda.time.Duration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link PrismPipelineResult}. */
+@RunWith(JUnit4.class)
+public class PrismPipelineResultTest {
+
+ final PrismExecutor exec = executor();
+
+ @Before
+ public void setUp() throws IOException {
+ exec.execute();
+ assertThat(exec.isAlive()).isTrue();
+ }
+
+ @After
+ public void tearDown() {
+ assertThat(exec.isAlive()).isFalse();
+ }
+
+ @Test
+ public void givenTerminated_reportsState() {
+ PipelineResult delegate = mock(PipelineResult.class);
+ when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.FAILED);
+ PrismPipelineResult underTest = new PrismPipelineResult(delegate,
exec::stop);
+ // Assigns terminal state.
+ underTest.waitUntilFinish();
+ assertThat(underTest.getState()).isEqualTo(PipelineResult.State.FAILED);
+ }
+
+ @Test
+ public void givenNotTerminated_reportsState() {
+ PipelineResult delegate = mock(PipelineResult.class);
+ when(delegate.getState()).thenReturn(PipelineResult.State.RUNNING);
+ PrismPipelineResult underTest = new PrismPipelineResult(delegate,
exec::stop);
+ assertThat(underTest.getState()).isEqualTo(PipelineResult.State.RUNNING);
+ exec.stop();
+ }
+
+ @Test
+ public void cancelStopsExecutable_reportsTerminalState() throws IOException {
+ PipelineResult delegate = mock(PipelineResult.class);
+ when(delegate.cancel()).thenReturn(PipelineResult.State.CANCELLED);
+ PrismPipelineResult underTest = new PrismPipelineResult(delegate,
exec::stop);
+ assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.CANCELLED);
+ }
+
+ @Test
+ public void givenTerminated_cancelIsNoop_reportsTerminalState() throws
IOException {
+ PipelineResult delegate = mock(PipelineResult.class);
+ when(delegate.cancel()).thenReturn(PipelineResult.State.FAILED);
+ PrismPipelineResult underTest = new PrismPipelineResult(delegate,
exec::stop);
+ assertThat(underTest.cancel()).isEqualTo(PipelineResult.State.FAILED);
+ }
+
+ @Test
+ public void
givenPipelineRunWithDuration_waitUntilFinish_reportsTerminalState() {
+ PipelineResult delegate = mock(PipelineResult.class);
+ when(delegate.waitUntilFinish(Duration.millis(3000L)))
+ .thenReturn(PipelineResult.State.CANCELLED);
+ PrismPipelineResult underTest = new PrismPipelineResult(delegate,
exec::stop);
+ assertThat(underTest.waitUntilFinish(Duration.millis(3000L)))
+ .isEqualTo(PipelineResult.State.CANCELLED);
+ }
+
+ @Test
+ public void givenTerminated_waitUntilFinishIsNoop_reportsTerminalState() {
+ PipelineResult delegate = mock(PipelineResult.class);
+ when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE);
+ PrismPipelineResult underTest = new PrismPipelineResult(delegate,
exec::stop);
+ // Terminate Job as setup for additional call.
+ underTest.waitUntilFinish();
+
assertThat(underTest.waitUntilFinish()).isEqualTo(PipelineResult.State.DONE);
+ }
+
+ @Test
+ public void givenNotTerminated_reportsMetrics() {
+ PipelineResult delegate = mock(PipelineResult.class);
+ when(delegate.metrics()).thenReturn(mock(MetricResults.class));
+ PrismPipelineResult underTest = new PrismPipelineResult(delegate,
exec::stop);
+ assertThat(underTest.metrics()).isNotNull();
+ exec.stop();
+ }
+
+ @Test
+ public void givenTerminated_reportsTerminatedMetrics() {
+ PipelineResult delegate = mock(PipelineResult.class);
+ when(delegate.metrics()).thenReturn(mock(MetricResults.class));
+ when(delegate.waitUntilFinish()).thenReturn(PipelineResult.State.DONE);
+ PrismPipelineResult underTest = new PrismPipelineResult(delegate,
exec::stop);
+ // Terminate Job as setup for additional call.
+ underTest.waitUntilFinish();
+ assertThat(underTest.metrics()).isNotNull();
+ }
+
+ private static PrismExecutor executor() {
+ return
PrismExecutor.builder().setCommand(getLocalPrismBuildOrIgnoreTest()).build();
+ }
+}