[ 
https://issues.apache.org/jira/browse/BEAM-4201?focusedWorklogId=101502&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-101502
 ]

ASF GitHub Bot logged work on BEAM-4201:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/May/18 02:33
            Start Date: 13/May/18 02:33
    Worklog Time Spent: 10m 
      Work Description: kennknowles commented on a change in pull request 
#5347: [BEAM-4201][SQL] Add integration test for PubsubIO JSON -> SQL
URL: https://github.com/apache/beam/pull/5347#discussion_r187787448
 
 

 ##########
 File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
 ##########
 @@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * Test rule which creates a new topic with randomized name and exposed a 
method to publishto it.
+ *
+ * <p>Deletes topic on shutdown.
+ */
+public class TestPubsub implements TestRule {
+  private static final DateTimeFormatter DATETIME_FORMAT =
+      DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
+  private static final String TOPIC_FORMAT = "projects/%s/topics/%s";
+  private static final String TOPIC_PREFIX = "integ-test-";
+  private static final String NO_ID_ATTRIBUTE = null;
+  private static final String NO_TIMESTAMP_ATTRIBUTE = null;
+
+  PubsubClient pubsub;
+  private TestPubsubOptions pipelineOptions;
+  private String eventsTopicPath;
+
+  /**
+   * Creates an instance of this rule.
+   *
+   * <p>Loads GCP configuration from {@link TestPipelineOptions}.
+   */
+  public static TestPubsub create() {
+    TestPubsubOptions options =
+        TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class);
+    return new TestPubsub(options);
+  }
+
+  private TestPubsub(TestPubsubOptions pipelineOptions) {
+    this.pipelineOptions = pipelineOptions;
+  }
+
+  @Override
+  public Statement apply(Statement base, Description description) {
+    return new Statement() {
+      @Override
+      public void evaluate() throws Throwable {
+        if (TestPubsub.this.pubsub != null) {
+          throw new AssertionError(
+              "Pubsub client was not shutdown in previous test. "
+              + "Topic path is'" + eventsTopicPath + "'. "
+              + "Current test: " + description.getDisplayName());
+        }
+
+        try {
+          initializePubsub(description);
+          base.evaluate();
+        } finally {
+          tearDown();
+        }
+      }
+    };
+  }
+
+  private void initializePubsub(Description description) throws IOException {
+    pubsub = PubsubGrpcClient.FACTORY.newClient(
+        NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, pipelineOptions);
+    String eventsTopicPathTmp =
+        String.format(TOPIC_FORMAT, pipelineOptions.getProject(), 
createTopicName(description));
+
+    pubsub.createTopic(new TopicPath(eventsTopicPathTmp));
+
+    eventsTopicPath = eventsTopicPathTmp;
+  }
+
+  private void tearDown() throws IOException {
+    if (pubsub == null) {
+      return;
+    }
+
+    try {
+      if (eventsTopicPath != null) {
+        pubsub.deleteTopic(new TopicPath(eventsTopicPath));
+      }
+    } finally {
+      pubsub.close();
+      pubsub = null;
+      eventsTopicPath = null;
+    }
+  }
+
+  /**
+   * Generates randomized topic name.
+   *
+   * <p>Example:
+   * 'TestClassName-testMethodName-2018-12-11-23-32-333-&lt;random-long&gt;'
+   */
+  static String createTopicName(Description description) throws IOException {
+    StringBuilder topicName = new StringBuilder(TOPIC_PREFIX);
+
+    if (description.getClassName() != null) {
+      try {
+        topicName
+            .append(Class.forName(description.getClassName()).getSimpleName())
+            .append("-");
+      } catch (ClassNotFoundException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    if (description.getMethodName() != null) {
+      topicName.append(description.getMethodName()).append("-");
+    }
+
+    DATETIME_FORMAT.printTo(topicName, Instant.now());
+
+    return topicName.toString() + "-" + 
String.valueOf(ThreadLocalRandom.current().nextLong());
+  }
+
+  /**
+   * Topic path where events will be published to.
+   */
+  public String evevntsTopicPath() {
 
 Review comment:
   typo: evevnts

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 101502)

> Integration Tests for PubsubIO
> ------------------------------
>
>                 Key: BEAM-4201
>                 URL: https://issues.apache.org/jira/browse/BEAM-4201
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Anton Kedin
>            Assignee: Anton Kedin
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Add integration tests for PubsubIO



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to