[
https://issues.apache.org/jira/browse/BEAM-4201?focusedWorklogId=102270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-102270
]
ASF GitHub Bot logged work on BEAM-4201:
----------------------------------------
Author: ASF GitHub Bot
Created on: 15/May/18 20:06
Start Date: 15/May/18 20:06
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5347: [BEAM-4201][SQL]
Add integration test for PubsubIO JSON -> SQL
URL: https://github.com/apache/beam/pull/5347
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/build.gradle b/build.gradle
index 5fbf00f7873..74e82b886d7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -145,6 +145,7 @@ task javaPreCommit() {
task javaPostCommit() {
dependsOn ":javaPreCommit"
dependsOn ":beam-runners-google-cloud-dataflow-java:postCommit"
+ dependsOn ":beam-sdks-java-extensions-sql:postCommit"
}
task goPreCommit() {
diff --git a/sdks/java/extensions/sql/build.gradle
b/sdks/java/extensions/sql/build.gradle
index e269198904e..2bfb1357b46 100644
--- a/sdks/java/extensions/sql/build.gradle
+++ b/sdks/java/extensions/sql/build.gradle
@@ -1,3 +1,5 @@
+import groovy.json.JsonOutput
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -158,3 +160,26 @@ task runPojoExample(type: JavaExec) {
classpath = sourceSets.main.runtimeClasspath
args = ["--runner=DirectRunner"]
}
+
+
+task integrationTest(type: Test) {
+ group = "Verification"
+ def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
+
+ systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+ "--project=${gcpProject}",
+ "--blockOnRun=false"
+ ])
+
+ include '**/*IT.class'
+ maxParallelForks 4
+ classpath =
project(":beam-sdks-java-extensions-sql").sourceSets.test.runtimeClasspath
+ testClassesDirs =
files(project(":beam-sdks-java-extensions-sql").sourceSets.test.output.classesDirs)
+ useJUnit { }
+}
+
+task postCommit {
+ group = "Verification"
+ description = "Various integration tests"
+ dependsOn integrationTest
+}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
new file mode 100644
index 00000000000..af1dfa92dce
--- /dev/null
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import java.io.Serializable;
+import java.util.List;
+import org.apache.beam.sdk.extensions.sql.RowSqlTypes;
+import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
+import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
+import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
+import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.apache.calcite.sql.SqlExecutableStatement;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for querying Pubsub JSON messages with SQL.
+ */
+@RunWith(JUnit4.class)
+public class PubsubJsonIT implements Serializable {
+
+ private static final Schema PAYLOAD_SCHEMA =
+ RowSqlTypes
+ .builder()
+ .withIntegerField("id")
+ .withVarcharField("name")
+ .build();
+
+ @Rule public transient TestPubsub pubsub = TestPubsub.create();
+ @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void testSelectsPayloadContent() throws Exception {
+ String createTableString =
+ "CREATE TABLE message (\n"
+ + "event_timestamp TIMESTAMP, \n"
+ + "attributes MAP<VARCHAR, VARCHAR>, \n"
+ + "payload ROW< \n"
+ + " id INTEGER, \n"
+ + " name VARCHAR \n"
+ + " > \n"
+ + ") \n"
+ + "TYPE 'pubsub' \n"
+ + "LOCATION '" + pubsub.eventsTopicPath() + "' \n"
+ + "TBLPROPERTIES '{ \"timestampAttributeKey\" : \"ts\" }'";
+
+ String queryString = "SELECT message.payload.id, message.payload.name from
message";
+
+ List<PubsubMessage> messages = ImmutableList.of(
+ message(ts(1), 3, "foo"),
+ message(ts(2), 5, "bar"),
+ message(ts(3), 7, "baz"));
+
+ BeamSqlEnv sqlEnv = newSqlEnv();
+
+ createTable(sqlEnv, createTableString);
+ PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);
+
+ queryOutput
+ .apply(
+ "waitForSuccess",
+ signal.signalSuccessWhen(
+ PAYLOAD_SCHEMA.getRowCoder(),
+ observedRows -> observedRows.equals(
+ ImmutableSet.of(
+ row(PAYLOAD_SCHEMA, 3, "foo"),
+ row(PAYLOAD_SCHEMA, 5, "bar"),
+ row(PAYLOAD_SCHEMA, 7, "baz")))));
+
+ pipeline.run();
+ pubsub.publish(messages);
+ signal.waitForSuccess(Duration.standardSeconds(120));
+ }
+
+ private BeamSqlEnv newSqlEnv() {
+ InMemoryMetaStore metaStore = new InMemoryMetaStore();
+ metaStore.registerProvider(new PubsubJsonTableProvider());
+ return new BeamSqlEnv(metaStore);
+ }
+
+ private void createTable(BeamSqlEnv sqlEnv, String statement) throws
SqlParseException {
+ SqlNode sqlNode = sqlEnv.getPlanner().parse(statement);
+ ((SqlExecutableStatement) sqlNode).execute(sqlEnv.getContext());
+ }
+
+ private Row row(Schema schema, Object... values) {
+ return Row.withSchema(schema).addValues(values).build();
+ }
+
+ private PCollection<Row> query(
+ BeamSqlEnv sqlEnv,
+ TestPipeline pipeline,
+ String queryString)
+ throws Exception {
+
+ return sqlEnv.getPlanner().compileBeamPipeline(queryString, pipeline);
+ }
+
+ private PubsubMessage message(Instant timestamp, int id, String name) {
+ return
+ new PubsubMessage(
+ jsonString(id, name).getBytes(UTF_8),
+ ImmutableMap.of("ts", String.valueOf(timestamp.getMillis())));
+ }
+
+ private String jsonString(int id, String name) {
+ return "{ \"id\" : " + id + ", \"name\" : \"" + name + "\" }";
+ }
+
+ private Instant ts(long millis) {
+ return new Instant(millis);
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
new file mode 100644
index 00000000000..731570695ee
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static java.util.stream.Collectors.toList;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.joda.time.DateTime;
+import org.joda.time.Instant;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * Test rule which creates a new topic with randomized name and exposes the
APIs to work with it.
+ *
+ * <p>Deletes topic on shutdown.
+ */
+public class TestPubsub implements TestRule {
+ private static final DateTimeFormatter DATETIME_FORMAT =
+ DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
+ private static final String TOPIC_FORMAT = "projects/%s/topics/%s";
+ private static final String TOPIC_PREFIX = "integ-test-";
+ private static final String NO_ID_ATTRIBUTE = null;
+ private static final String NO_TIMESTAMP_ATTRIBUTE = null;
+
+ PubsubClient pubsub;
+ private TestPubsubOptions pipelineOptions;
+ private String eventsTopicPath;
+
+ /**
+ * Creates an instance of this rule.
+ *
+ * <p>Loads GCP configuration from {@link TestPipelineOptions}.
+ */
+ public static TestPubsub create() {
+ TestPubsubOptions options =
+ TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class);
+ return new TestPubsub(options);
+ }
+
+ private TestPubsub(TestPubsubOptions pipelineOptions) {
+ this.pipelineOptions = pipelineOptions;
+ }
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ if (TestPubsub.this.pubsub != null) {
+ throw new AssertionError(
+ "Pubsub client was not shutdown in previous test. "
+ + "Topic path is'" + eventsTopicPath + "'. "
+ + "Current test: " + description.getDisplayName());
+ }
+
+ try {
+ initializePubsub(description);
+ base.evaluate();
+ } finally {
+ tearDown();
+ }
+ }
+ };
+ }
+
+ private void initializePubsub(Description description) throws IOException {
+ pubsub = PubsubGrpcClient.FACTORY.newClient(
+ NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, pipelineOptions);
+ String eventsTopicPathTmp =
+ String.format(TOPIC_FORMAT, pipelineOptions.getProject(),
createTopicName(description));
+
+ pubsub.createTopic(new TopicPath(eventsTopicPathTmp));
+
+ eventsTopicPath = eventsTopicPathTmp;
+ }
+
+ private void tearDown() throws IOException {
+ if (pubsub == null) {
+ return;
+ }
+
+ try {
+ if (eventsTopicPath != null) {
+ pubsub.deleteTopic(new TopicPath(eventsTopicPath));
+ }
+ } finally {
+ pubsub.close();
+ pubsub = null;
+ eventsTopicPath = null;
+ }
+ }
+
+ /**
+ * Generates randomized topic name.
+ *
+ * <p>Example:
+ * 'TestClassName-testMethodName-2018-12-11-23-32-333-<random-long>'
+ */
+ static String createTopicName(Description description) throws IOException {
+ StringBuilder topicName = new StringBuilder(TOPIC_PREFIX);
+
+ if (description.getClassName() != null) {
+ try {
+ topicName
+ .append(Class.forName(description.getClassName()).getSimpleName())
+ .append("-");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ if (description.getMethodName() != null) {
+ topicName.append(description.getMethodName()).append("-");
+ }
+
+ DATETIME_FORMAT.printTo(topicName, Instant.now());
+
+ return topicName.toString() + "-" +
String.valueOf(ThreadLocalRandom.current().nextLong());
+ }
+
+ /**
+ * Topic path where events will be published to.
+ */
+ public String eventsTopicPath() {
+ return eventsTopicPath;
+ }
+
+ /**
+ * Publish messages to {@link #eventsTopicPath()}.
+ */
+ public void publish(List<PubsubMessage> messages) throws IOException {
+ List<PubsubClient.OutgoingMessage> outgoingMessages =
+ messages
+ .stream()
+ .map(this::toOutgoingMessage)
+ .collect(toList());
+ pubsub.publish(new TopicPath(eventsTopicPath), outgoingMessages);
+ }
+
+ private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage
message) {
+ return new PubsubClient.OutgoingMessage(
+ message.getPayload(),
+ message.getAttributeMap(),
+ DateTime.now().getMillis(),
+ null);
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubOptions.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubOptions.java
new file mode 100644
index 00000000000..aab2a58b464
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubOptions.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+
+/**
+ * {@link PipelineOptions} for {@link TestPubsub}.
+ */
+public interface TestPubsubOptions extends TestPipelineOptions, PubsubOptions,
GcpOptions {
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
new file mode 100644
index 00000000000..274f0c0201a
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static java.util.stream.Collectors.toList;
+import static org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.createTopicName;
+
+import com.google.common.collect.ImmutableSet;
+import io.grpc.StatusRuntimeException;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
+import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
+import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.WithKeys;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.joda.time.DateTime;
+import org.joda.time.Duration;
+import org.junit.rules.TestRule;
+import org.junit.runner.Description;
+import org.junit.runners.model.Statement;
+
+/**
+ * Test rule which observes elements of the {@link PCollection} and checks
whether they
+ * match the success criteria.
+ *
+ * <p>Uses a random temporary Pubsub topic for synchronization.
+ */
+public class TestPubsubSignal implements TestRule {
+ private static final String TOPIC_FORMAT = "projects/%s/topics/%s-result1";
+ private static final String SUBSCRIPTION_FORMAT =
"projects/%s/subscriptions/%s";
+ private static final String NO_ID_ATTRIBUTE = null;
+ private static final String NO_TIMESTAMP_ATTRIBUTE = null;
+
+ PubsubClient pubsub;
+ private TestPubsubOptions pipelineOptions;
+ private String resultTopicPath;
+
+ /**
+ * Creates an instance of this rule.
+ *
+ * <p>Loads GCP configuration from {@link TestPipelineOptions}.
+ */
+ public static TestPubsubSignal create() {
+ TestPubsubOptions options =
+ TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class);
+ return new TestPubsubSignal(options);
+ }
+
+ private TestPubsubSignal(TestPubsubOptions pipelineOptions) {
+ this.pipelineOptions = pipelineOptions;
+ }
+
+ @Override
+ public Statement apply(Statement base, Description description) {
+ return new Statement() {
+ @Override
+ public void evaluate() throws Throwable {
+ if (TestPubsubSignal.this.pubsub != null) {
+ throw new AssertionError(
+ "Pubsub client was not shutdown in previous test. "
+ + "Topic path is'" + resultTopicPath + "'. "
+ + "Current test: " + description.getDisplayName());
+ }
+
+ try {
+ initializePubsub(description);
+ base.evaluate();
+ } finally {
+ tearDown();
+ }
+ }
+ };
+ }
+
+ private void initializePubsub(Description description) throws IOException {
+ pubsub = PubsubGrpcClient.FACTORY.newClient(
+ NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, pipelineOptions);
+
+ // Example topic name:
+ //
integ-test-TestClassName-testMethodName-2018-12-11-23-32-333-<random-long>-result
+ String resultTopicPathTmp =
+ String.format(TOPIC_FORMAT, pipelineOptions.getProject(),
createTopicName(description));
+
+ pubsub.createTopic(new TopicPath(resultTopicPathTmp));
+
+ resultTopicPath = resultTopicPathTmp;
+ }
+
+ private void tearDown() throws IOException {
+ if (pubsub == null) {
+ return;
+ }
+
+ try {
+ if (resultTopicPath != null) {
+ pubsub.deleteTopic(new TopicPath(resultTopicPath));
+ }
+ } finally {
+ pubsub.close();
+ pubsub = null;
+ resultTopicPath = null;
+ }
+ }
+
+ /**
+ * Outputs a success message when {@code successPredicate} is evaluated to
true.
+ *
+ * <p>{@code successPredicate} is a {@link SerializableFunction} that
+ * accepts a set of currently captured events and returns true when the set
satisfies the success
+ * criteria.
+ *
+ * <p>If {@code successPredicate} is evaluated to false, then it will be
re-evaluated when next
+ * event becomes available.
+ *
+ * <p>If {@code successPredicate} is evaluated to true, then a success will
be signaled and
+ * {@link #waitForSuccess(Duration)} will unblock.
+ *
+ * <p>If {@code successPredicate} throws, then failure will be signaled and
+ * {@link #waitForSuccess(Duration)} will unblock.
+ */
+ public <T> PTransform<PCollection<? extends T>, POutput> signalSuccessWhen(
+ Coder<T> coder,
+ SerializableFunction<Set<T>, Boolean> successPredicate) {
+
+ return new PublishSuccessWhen<>(coder, successPredicate, resultTopicPath);
+ }
+
+ /**
+ * Wait for a success signal for {@code duration}.
+ */
+ public void waitForSuccess(Duration duration) throws IOException {
+ SubscriptionPath resultSubscriptionPath = new
SubscriptionPath(String.format(
+ SUBSCRIPTION_FORMAT,
+ pipelineOptions.getProject(),
+ "subscription-" +
String.valueOf(ThreadLocalRandom.current().nextLong())));
+
+ pubsub.createSubscription(
+ new TopicPath(resultTopicPath),
+ resultSubscriptionPath,
+ (int) duration.getStandardSeconds());
+
+ String result = pollForResultForDuration(resultSubscriptionPath, duration);
+
+ if (!"SUCCESS".equals(result)) {
+ throw new AssertionError(result);
+ }
+ }
+
+ private String pollForResultForDuration(
+ SubscriptionPath resultSubscriptionPath,
+ Duration duration) throws IOException {
+
+ List<PubsubClient.IncomingMessage> result = null;
+ DateTime endPolling = DateTime.now().plus(duration.getMillis());
+
+ do {
+ try {
+ result = pubsub.pull(DateTime.now().getMillis(),
resultSubscriptionPath, 1, false);
+ pubsub.acknowledge(
+ resultSubscriptionPath, result.stream().map(m ->
m.ackId).collect(toList()));
+ break;
+ } catch (StatusRuntimeException e) {
+ System.out.println("Error while polling for result: " + e.getStatus());
+ sleep(500);
+ }
+ } while (DateTime.now().isBefore(endPolling));
+
+ if (result == null) {
+ throw new AssertionError(
+ "Did not receive success in " + duration.getStandardSeconds() + "s");
+ }
+
+ return new String(result.get(0).elementBytes, UTF_8);
+ }
+
+ private void sleep(long t) {
+ try {
+ Thread.sleep(t);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * {@link PTransform} that for validates whether elements seen so far match
success criteria.
+ */
+ static class PublishSuccessWhen<T> extends PTransform<PCollection<? extends
T>, POutput> {
+ private Coder<T> coder;
+ private SerializableFunction<Set<T>, Boolean> successPredicate;
+ private String resultTopicPath;
+
+ PublishSuccessWhen(
+ Coder<T> coder,
+ SerializableFunction<Set<T>, Boolean> successPredicate,
+ String resultTopicPath) {
+
+ this.coder = coder;
+ this.successPredicate = successPredicate;
+ this.resultTopicPath = resultTopicPath;
+ }
+
+ @Override
+ public POutput expand(PCollection<? extends T> input) {
+ return
+ input
+ // assign a dummy key and global window,
+ // this is needed to accumulate all observed events in the same
state cell
+ .apply(Window.into(new GlobalWindows()))
+ .apply(WithKeys.of("dummyKey"))
+ .apply(
+ "checkAllEventsForSuccess",
+ ParDo.of(new StatefulPredicateCheck<>(coder,
successPredicate)))
+ // signal the success/failure to the result topic
+ .apply(
+ "publishSuccess",
+ PubsubIO.writeStrings().to(resultTopicPath));
+ }
+ }
+
+ /**
+ * Stateful {@link DoFn} which caches the elements it sees and checks
whether they satisfy
+ * the predicate.
+ *
+ * <p>When predicate is satisfied outputs "SUCCESS". If predicate throws
execption, outputs
+ * "FAILURE".
+ */
+ static class StatefulPredicateCheck<T> extends DoFn<KV<String, ? extends T>,
String> {
+ private SerializableFunction<Set<T>, Boolean> successPredicate;
+ // keep all events seen so far in the state cell
+
+ @StateId("seenEvents")
+ private final StateSpec<SetState<T>> seenEvents;
+
+ StatefulPredicateCheck(
+ Coder<T> coder,
+ SerializableFunction<Set<T>, Boolean> successPredicate) {
+ this.seenEvents = StateSpecs.set(coder);
+ this.successPredicate = successPredicate;
+ }
+
+ @ProcessElement
+ public void processElement(
+ ProcessContext context,
+ @StateId("seenEvents") SetState<T> seenEvents) {
+
+ seenEvents.add(context.element().getValue());
+ ImmutableSet<T> eventsSoFar = ImmutableSet.copyOf(seenEvents.read());
+
+ // check if all elements seen so far satisfy the success predicate
+ try {
+ if (successPredicate.apply(eventsSoFar)) {
+ context.output("SUCCESS");
+ }
+ } catch (Throwable e) {
+ context.output("FAILURE: " + e.getMessage());
+ }
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 102270)
Time Spent: 5h (was: 4h 50m)
> Integration Tests for PubsubIO
> ------------------------------
>
> Key: BEAM-4201
> URL: https://issues.apache.org/jira/browse/BEAM-4201
> Project: Beam
> Issue Type: Improvement
> Components: dsl-sql
> Reporter: Anton Kedin
> Assignee: Anton Kedin
> Priority: Major
> Time Spent: 5h
> Remaining Estimate: 0h
>
> Add integration tests for PubsubIO
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)