lostluck commented on code in PR #32264:
URL: https://github.com/apache/beam/pull/32264#discussion_r1725308806
##########
runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismRunnerTest.java:
##########
@@ -17,48 +17,115 @@
*/
package org.apache.beam.runners.prism;
-import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assume.assumeTrue;
-import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link PrismRunner}. */
-
-// TODO(https://github.com/apache/beam/issues/31793): Remove @Ignore after
finalizing PrismRunner.
-// Depends on: https://github.com/apache/beam/issues/31402 and
-// https://github.com/apache/beam/issues/31792.
-@Ignore
@RunWith(JUnit4.class)
public class PrismRunnerTest {
+
// See build.gradle for test task configuration.
private static final String PRISM_BUILD_TARGET_PROPERTY_NAME =
"prism.buildTarget";
@Test
- public void givenBoundedSource_runsUntilDone() {
+ public void create() {
Pipeline pipeline = Pipeline.create(options());
- pipeline.apply(Create.of(1, 2, 3));
- PipelineResult.State state = pipeline.run().waitUntilFinish();
- assertThat(state).isEqualTo(PipelineResult.State.DONE);
+ PAssert.that(pipeline.apply(Create.of(1, 2, 3))).containsInAnyOrder(1, 2,
3);
+ pipeline.run();
}
+ @Ignore
@Test
- public void givenUnboundedSource_runsUntilCancel() throws IOException {
+ public void windowing() {
Pipeline pipeline = Pipeline.create(options());
- pipeline.apply(PeriodicImpulse.create());
- PipelineResult result = pipeline.run();
- assertThat(result.getState()).isEqualTo(PipelineResult.State.RUNNING);
- PipelineResult.State state = result.cancel();
- assertThat(state).isEqualTo(PipelineResult.State.CANCELLED);
+ PCollection<KV<String, Iterable<Integer>>> got =
+ pipeline
+ .apply(Create.of(1, 2, 100, 101, 102, 123))
+ .apply(WithTimestamps.of(t -> Instant.ofEpochSecond(t)))
+ .apply(WithKeys.of("k"))
+ .apply(Window.into(FixedWindows.of(Duration.standardSeconds(10))))
+ .apply(GroupByKey.create());
+
+ List<KV<String, Iterable<Integer>>> want =
+ Arrays.asList(
+ KV.of("k", Arrays.asList(1, 2)),
+ KV.of("k", Arrays.asList(100, 101, 102)),
+ KV.of("k", Collections.singletonList(123)));
+
+ PAssert.that(got).containsInAnyOrder(want);
+
+ pipeline.run();
+ }
+
+ @Ignore("Unable to find inbound timer receiver for instruction")
Review Comment:
This was fixed by one of my recent PRs.
##########
runners/prism/java/src/main/java/org/apache/beam/runners/prism/PrismRunner.java:
##########
@@ -72,15 +72,50 @@ public PipelineResult run(Pipeline pipeline) {
prismPipelineOptions.getDefaultEnvironmentType(),
prismPipelineOptions.getJobEndpoint());
- return internal.run(pipeline);
+ Runnable closer = runPrismAndProvideCloser(prismPipelineOptions);
+ try {
+ JobServicePipelineResult delegateResult = (JobServicePipelineResult)
internal.run(pipeline);
+ PrismPipelineResult result = new PrismPipelineResult(delegateResult,
closer);
+ CompletableFuture<?> ignored =
+
delegateResult.getTerminalStateFuture().whenComplete(result::onJobStateComplete);
+ if (isInvokedInTest()) {
+ LOG.info("invoking Pipeline::waitUntilFinish due to invoking in a
class named ^.*Test$");
+ result.waitUntilFinish();
+ }
+ return result;
+ } catch (RuntimeException e) {
+ closer.run();
+ throw e;
+ }
+ }
+
+ private boolean isInvokedInTest() {
+ for (StackTraceElement element : Thread.currentThread().getStackTrace()) {
+ if (element.getClassName().endsWith("Test")) {
+ return true;
+ }
+ }
+ return false;
}
- private static void assignDefaultsIfNeeded(PrismPipelineOptions
prismPipelineOptions) {
- if
(Strings.isNullOrEmpty(prismPipelineOptions.getDefaultEnvironmentType())) {
-
prismPipelineOptions.setDefaultEnvironmentType(Environments.ENVIRONMENT_LOOPBACK);
+ private static void assignDefaultsIfNeeded(PrismPipelineOptions options) {
+ if (Strings.isNullOrEmpty(options.getDefaultEnvironmentType())) {
+ options.setDefaultEnvironmentType(Environments.ENVIRONMENT_LOOPBACK);
}
- if (Strings.isNullOrEmpty(prismPipelineOptions.getJobEndpoint())) {
- prismPipelineOptions.setJobEndpoint(DEFAULT_PRISM_ENDPOINT);
+ if (Strings.isNullOrEmpty(options.getJobEndpoint())) {
+ options.setJobEndpoint(DEFAULT_PRISM_ENDPOINT);
Review Comment:
The default behavior shouldn't be a port that prism selects, but the one
this caller selects *for* prism. There's no guarantee that 8073 is ever free or
remains free.
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/fn/server/GrpcFnServer.java:
##########
@@ -150,14 +153,12 @@ public Server getServer() {
@Override
public void close() throws Exception {
+ LOG.info("Shutting down");
Review Comment:
We'll want to remove this debug logging at least, as this will apply to
*all* portable Java jobs for each FnAPI service.
It's not clear we actually want to include the changes in this file, as the
previous version was more robust and already calling the various shutdown
sequences.
It also has a higher blast radius than the other cleanups you're applying.
##########
runners/prism/java/src/test/java/org/apache/beam/runners/prism/PrismRunnerTest.java:
##########
@@ -17,48 +17,115 @@
*/
package org.apache.beam.runners.prism;
-import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assume.assumeTrue;
-import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.WithTimestamps;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
/** Tests for {@link PrismRunner}. */
-
-// TODO(https://github.com/apache/beam/issues/31793): Remove @Ignore after
finalizing PrismRunner.
-// Depends on: https://github.com/apache/beam/issues/31402 and
-// https://github.com/apache/beam/issues/31792.
-@Ignore
@RunWith(JUnit4.class)
public class PrismRunnerTest {
+
// See build.gradle for test task configuration.
private static final String PRISM_BUILD_TARGET_PROPERTY_NAME =
"prism.buildTarget";
@Test
- public void givenBoundedSource_runsUntilDone() {
+ public void create() {
Pipeline pipeline = Pipeline.create(options());
- pipeline.apply(Create.of(1, 2, 3));
- PipelineResult.State state = pipeline.run().waitUntilFinish();
- assertThat(state).isEqualTo(PipelineResult.State.DONE);
+ PAssert.that(pipeline.apply(Create.of(1, 2, 3))).containsInAnyOrder(1, 2,
3);
+ pipeline.run();
}
+ @Ignore
Review Comment:
Why is this test being ignored?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]