[ 
https://issues.apache.org/jira/browse/BEAM-4201?focusedWorklogId=101503&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101503
 ]

ASF GitHub Bot logged work on BEAM-4201:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/May/18 02:33
            Start Date: 13/May/18 02:33
    Worklog Time Spent: 10m 
      Work Description: kennknowles commented on a change in pull request 
#5347: [BEAM-4201][SQL] Add integration test for PubsubIO JSON -> SQL
URL: https://github.com/apache/beam/pull/5347#discussion_r187787527
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
 ##########
 @@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.createTopicName;
+
+import com.google.common.collect.ImmutableSet;
+import io.grpc.StatusRuntimeException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * Test rule which observes elements and checks whether they match the success 
creteria.
+ */
+public class TestPubsubSignal implements TestRule {
+  private static final String TOPIC_FORMAT = "projects/%s/topics/%s-result1";
+  private static final String SUBSCRIPTION_FORMAT = 
"projects/%s/subscriptions/%s";
+  private static final String NO_ID_ATTRIBUTE = null;
+  private static final String NO_TIMESTAMP_ATTRIBUTE = null;
+
+  PubsubClient pubsub;
+  private TestPubsubOptions pipelineOptions;
+  private String resultTopicPath;
+
+  /**
+   * Creates an instance of this rule.
+   *
+   * <p>Loads GCP configuration from {@link TestPipelineOptions}.
+   */
+  public static TestPubsubSignal create() {
+    TestPubsubOptions options =
+        TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class);
+    return new TestPubsubSignal(options);
+  }
+
+  private TestPubsubSignal(TestPubsubOptions pipelineOptions) {
+    this.pipelineOptions = pipelineOptions;
+  }
+
+  @Override
+  public Statement apply(Statement base, Description description) {
+    return new Statement() {
+      @Override
+      public void evaluate() throws Throwable {
+        if (TestPubsubSignal.this.pubsub != null) {
+          throw new AssertionError(
+              "Pubsub client was not shutdown in previous test. "
+              + "Topic path is'" + resultTopicPath + "'. "
+              + "Current test: " + description.getDisplayName());
+        }
+
+        try {
+          initializePubsub(description);
+          base.evaluate();
+        } finally {
+          tearDown();
+        }
+      }
+    };
+  }
+
+  private void initializePubsub(Description description) throws IOException {
+    pubsub = PubsubGrpcClient.FACTORY.newClient(
+        NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, pipelineOptions);
+
+    // Example topic name:
+    //    
integ-test-TestClassName-testMethodName-2018-12-11-23-32-333-<random-long>-result
+    String resultTopicPathTmp =
+        String.format(TOPIC_FORMAT, pipelineOptions.getProject(), 
createTopicName(description));
+
+    pubsub.createTopic(new TopicPath(resultTopicPathTmp));
+
+    resultTopicPath = resultTopicPathTmp;
+  }
+
+  private void tearDown() throws IOException {
+    if (pubsub == null) {
+      return;
+    }
+
+    try {
+      if (resultTopicPath != null) {
+        pubsub.deleteTopic(new TopicPath(resultTopicPath));
+      }
+    } finally {
+      pubsub.close();
+      pubsub = null;
+      resultTopicPath = null;
+    }
+  }
+
+  /**
+   * Outputs a success message when {@code successPredicate} is evaluated to 
true.
+   *
+   * <p>{@code successPredicate} is a {@link SerializableFunction} that
+   * accepts a set of currently captured events and returns true when the set 
satisfies the success
+   * criteria.
+   *
+   * <p>If {@code successPredicate} is evaluated to false, then it will be 
re-evaluated when next
+   * event becomes available.
+   *
+   * <p>If {@code successPredicate} is evaluated to true, then a success will 
be signaled and
+   * {@link #waitForSuccess(Duration)} will unblock.
+   *
+   * <p>If {@code successPredicate} throws, then failure will be signaled and
+   * {@link #waitForSuccess(Duration)} will unblock.
+   */
+  public <T> PTransform<PCollection<? extends T>, POutput> signalSuccessWhen(
+      Coder<T> coder,
+      SerializableFunction<Set<T>, Boolean> successPredicate) {
+
+    return new PublishSuccessWhen<>(coder, successPredicate, resultTopicPath);
+  }
+
+  /**
+   * Wait for a success signal for {@code duration}.
+   */
+  public void waitForSuccess(Duration duration) throws IOException {
+    SubscriptionPath resultSubscriptionPath = new 
SubscriptionPath(String.format(
+        SUBSCRIPTION_FORMAT,
+        pipelineOptions.getProject(),
+        "subscription-" + 
String.valueOf(ThreadLocalRandom.current().nextLong())));
+
+    pubsub.createSubscription(
+        new TopicPath(resultTopicPath),
+        resultSubscriptionPath,
+        (int) duration.getStandardSeconds());
+
+    String result = pollForResultForDuration(resultSubscriptionPath, duration);
+
+    if (!"SUCCESS".equals(result)) {
+      throw new AssertionError(result);
+    }
+  }
+
+  private String pollForResultForDuration(
+      SubscriptionPath resultSubscriptionPath,
+      Duration duration) throws IOException {
+
+    List<PubsubClient.IncomingMessage> result = null;
+    DateTime endPolling = DateTime.now().plus(duration.getMillis());
+
+    do {
+      try {
+        result = pubsub.pull(DateTime.now().getMillis(), 
resultSubscriptionPath, 1, false);
+        pubsub.acknowledge(
+            resultSubscriptionPath, result.stream().map(m -> 
m.ackId).collect(toList()));
+        break;
+      } catch (StatusRuntimeException e) {
+        System.out.println("Error while polling for result: " + e.getStatus());
+        sleep(500);
+      }
+    } while (DateTime.now().isBefore(endPolling));
+
+    if (result == null) {
+      throw new AssertionError(
+          "Did not receive success in " + duration.getStandardSeconds() + "s");
+    }
+
+    return new String(result.get(0).elementBytes, UTF_8);
+  }
+
+  private void sleep(long t) {
+    try {
+      Thread.sleep(t);
+    } catch (InterruptedException ex) {
+      throw new RuntimeException(ex);
+    }
+  }
+
+  /**
+   * {@link PTransform} that for validates whether elements seen so far match 
success criteria.
+   */
+  static class PublishSuccessWhen<T> extends PTransform<PCollection<? extends 
T>, POutput> {
+    private Coder<T> coder;
+    private SerializableFunction<Set<T>, Boolean> successPredicate;
+    private String resultTopicPath;
+
+    PublishSuccessWhen(
+        Coder<T> coder,
+        SerializableFunction<Set<T>, Boolean> successPredicate,
+        String resultTopicPath) {
+
+      this.coder = coder;
+      this.successPredicate = successPredicate;
+      this.resultTopicPath = resultTopicPath;
+    }
+
+    @Override
+    public POutput expand(PCollection<? extends T> input) {
+      return
+          input
+              // assign a dummy key, need this to keep all events in the same 
state cell
+              .apply(WithKeys.of("dummyKey"))
 
 Review comment:
   What about windowing? You may also want to `Window.into(GlobalWindow)`. I 
think also a good amount of this logic exists in the `PAssert` implementation.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101503)

> Integration Tests for PubsubIO
> ------------------------------
>
>                 Key: BEAM-4201
>                 URL: https://issues.apache.org/jira/browse/BEAM-4201
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Anton Kedin
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Add integration tests for PubsubIO



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to