[
https://issues.apache.org/jira/browse/BEAM-4652?focusedWorklogId=117745&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-117745
]
ASF GitHub Bot logged work on BEAM-4652:
----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Jun/18 03:34
Start Date: 30/Jun/18 03:34
Worklog Time Spent: 10m
Work Description: kennknowles closed pull request #5788: [BEAM-4652]
Allow PubsubIO to read public data
URL: https://github.com/apache/beam/pull/5788
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git a/build.gradle b/build.gradle
index f6973bdf9e7..cccb4b9c7b4 100644
--- a/build.gradle
+++ b/build.gradle
@@ -166,6 +166,7 @@ task javaPreCommit() {
task javaPostCommit() {
dependsOn ":javaPreCommit"
dependsOn ":beam-runners-google-cloud-dataflow-java:postCommit"
+ dependsOn ":beam-sdks-java-io-google-cloud-platform:postCommit"
dependsOn ":beam-sdks-java-extensions-sql:postCommit"
}
diff --git a/runners/google-cloud-dataflow-java/build.gradle
b/runners/google-cloud-dataflow-java/build.gradle
index aa3714405cf..1aebd3ee720 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -147,6 +147,7 @@ task googleCloudPlatformIntegrationTest(type: Test) {
])
include '**/*IT.class'
+ exclude '**/PubsubReadIT.class'
maxParallelForks 4
classpath = configurations.googleCloudPlatformIntegrationTest
testClassesDirs =
files(project(":beam-sdks-java-io-google-cloud-platform").sourceSets.test.output.classesDirs)
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index b028b9c4fdf..4e1c70aeb9c 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -18,23 +18,29 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.pubsub;
import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
-import java.io.IOException;
import java.io.Serializable;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.JdbcDriver;
@@ -46,8 +52,10 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsub;
import org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal;
+import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.calcite.jdbc.CalciteConnection;
@@ -81,6 +89,14 @@
@Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
@Rule public transient TestPipeline pipeline = TestPipeline.create();
+ /**
+ * HACK: we need an objectmapper to turn pipelineoptions back into a map. We
need to use
+ * ReflectHelpers to get the extra PipelineOptions.
+ */
+ private static final ObjectMapper MAPPER =
+ new ObjectMapper()
+
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));
+
@Test
public void testSelectsPayloadContent() throws Exception {
String createTableString =
@@ -120,9 +136,13 @@ public void testSelectsPayloadContent() throws Exception {
row(PAYLOAD_SCHEMA, 5, "bar"),
row(PAYLOAD_SCHEMA, 7, "baz")))));
+ Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(5));
+ pipeline.begin().apply(signal.signalStart());
pipeline.run();
+ start.get();
+
eventsTopic.publish(messages);
- signal.waitForSuccess(Duration.standardMinutes(5));
+ signal.waitForSuccess(Duration.standardSeconds(60));
}
@Test
@@ -163,7 +183,8 @@ public void testUsesDlq() throws Exception {
sqlEnv.executeDdl(createTableString);
query(sqlEnv, pipeline, queryString);
PCollection<PubsubMessage> dlq =
-
pipeline.apply(PubsubIO.readMessagesWithAttributes().fromTopic(dlqTopic.topicPath()));
+ pipeline.apply(
+
PubsubIO.readMessagesWithAttributes().fromTopic(dlqTopic.topicPath().getPath()));
dlq.apply(
"waitForDlq",
@@ -172,14 +193,17 @@ public void testUsesDlq() throws Exception {
dlqMessages ->
containsAll(dlqMessages, message(ts(4), "{ - }"),
message(ts(5), "{ + }"))));
+ Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(5));
+ pipeline.begin().apply(signal.signalStart());
pipeline.run();
+ start.get();
eventsTopic.publish(messages);
- signal.waitForSuccess(Duration.standardMinutes(5));
+ signal.waitForSuccess(Duration.standardSeconds(60));
}
@Test
- public void testSQLLimit() throws SQLException, IOException,
InterruptedException {
+ public void testSQLLimit() throws Exception {
String createTableString =
"CREATE TABLE message (\n"
+ "event_timestamp TIMESTAMP, \n"
@@ -211,7 +235,8 @@ public void testSQLLimit() throws SQLException,
IOException, InterruptedExceptio
message(ts(6), 13, "ba4"),
message(ts(7), 15, "ba5"));
- CalciteConnection connection = connect(new PubsubJsonTableProvider());
+ // We need the default options on the schema to include the project passed
in for the integration test
+ CalciteConnection connection = connect(pipeline.getOptions(), new
PubsubJsonTableProvider());
Statement statement = connection.createStatement();
statement.execute(createTableString);
@@ -221,42 +246,62 @@ public void testSQLLimit() throws SQLException,
IOException, InterruptedExceptio
// However, because statement.executeQuery is a blocking call, it has to
be put into a
// seperate thread to execute.
ExecutorService pool = Executors.newFixedThreadPool(1);
- pool.execute(
- new Runnable() {
- @Override
- public void run() {
- try {
- ResultSet resultSet =
- statement.executeQuery("SELECT message.payload.id FROM
message LIMIT 3");
- assertTrue(resultSet.next());
- assertTrue(resultSet.next());
- assertTrue(resultSet.next());
- assertFalse(resultSet.next());
- checked = true;
- } catch (SQLException e) {
- LOG.warn(e.toString());
- }
- }
- });
+ Future<List<String>> queryResult =
+ pool.submit(
+ (Callable)
+ () -> {
+ ResultSet resultSet =
+ statement.executeQuery("SELECT message.payload.id FROM
message LIMIT 3");
+ ImmutableList.Builder<String> result =
ImmutableList.builder();
+ while (resultSet.next()) {
+ result.add(resultSet.getString(1));
+ }
+ return result.build();
+ });
// wait one minute to allow subscription creation.
Thread.sleep(60 * 1000);
eventsTopic.publish(messages);
- // Wait one minute to allow the thread finishes checks.
- Thread.sleep(60 * 1000);
- // verify if the thread has checked returned value from LIMIT query.
- assertTrue(checked);
+ assertThat(queryResult.get().size(), equalTo(3));
pool.shutdown();
}
- private CalciteConnection connect(TableProvider... tableProviders) throws
SQLException {
+ private static String toArg(Object o) {
+ try {
+ String jsonRepr = MAPPER.writeValueAsString(o);
+
+ // String and enums are expected to be unquoted on the command line
+ if (jsonRepr.startsWith("\"") && jsonRepr.endsWith("\"")) {
+ return jsonRepr.substring(1, jsonRepr.length() - 1);
+ } else {
+ return jsonRepr;
+ }
+ } catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private CalciteConnection connect(PipelineOptions options, TableProvider...
tableProviders)
+ throws SQLException {
+ // HACK: PipelineOptions should expose a prominent method to do this
reliably
+ // The actual options are in the "options" field of the converted map
+ Map<String, Object> optionsMap =
+ (Map<String, Object>) MAPPER.convertValue(pipeline.getOptions(),
Map.class).get("options");
+ Map<String, String> argsMap =
+ optionsMap
+ .entrySet()
+ .stream()
+ .collect(Collectors.toMap(entry -> entry.getKey(), entry ->
toArg(entry.getValue())));
+
InMemoryMetaStore inMemoryMetaStore = new InMemoryMetaStore();
for (TableProvider tableProvider : tableProviders) {
inMemoryMetaStore.registerProvider(tableProvider);
}
Properties info = new Properties();
- info.put(BEAM_CALCITE_SCHEMA, new BeamCalciteSchema(inMemoryMetaStore));
+ BeamCalciteSchema dbSchema = new BeamCalciteSchema(inMemoryMetaStore);
+ dbSchema.getPipelineOptions().putAll(argsMap);
+ info.put(BEAM_CALCITE_SCHEMA, dbSchema);
return (CalciteConnection) INSTANCE.connect(CONNECT_STRING_PREFIX, info);
}
diff --git a/sdks/java/io/google-cloud-platform/build.gradle
b/sdks/java/io/google-cloud-platform/build.gradle
index 65f11aa50ee..0ec567c9a07 100644
--- a/sdks/java/io/google-cloud-platform/build.gradle
+++ b/sdks/java/io/google-cloud-platform/build.gradle
@@ -16,6 +16,8 @@
* limitations under the License.
*/
+import groovy.json.JsonOutput
+
apply plugin: org.apache.beam.gradle.BeamModulePlugin
applyJavaNature(
enableFindbugs: false,
@@ -77,3 +79,32 @@ dependencies {
shadowTest library.java.junit
shadowTest library.java.slf4j_jdk14
}
+
+/**
+ * These are integration tests with the real Pubsub service and the
DirectRunner.
+ */
+task integrationTest(type: Test) {
+ group = "Verification"
+ def gcpProject = project.findProperty('gcpProject') ?: 'apache-beam-testing'
+ def gcpTempRoot = project.findProperty('gcpTempRoot') ?:
'gs://temp-storage-for-end-to-end-tests'
+ systemProperty "beamTestPipelineOptions", JsonOutput.toJson([
+ "--runner=DirectRunner",
+ "--project=${gcpProject}",
+ "--tempRoot=${gcpTempRoot}",
+ ])
+
+ // Disable Gradle cache: these ITs interact with live service that should
always be considered "out of date"
+ outputs.upToDateWhen { false }
+
+ include '**/*IT.class'
+ maxParallelForks 4
+ classpath = sourceSets.test.runtimeClasspath
+ testClassesDirs = sourceSets.test.output.classesDirs
+ useJUnit { }
+}
+
+task postCommit {
+ group = "Verification"
+ description = "Integration tests of GCP connectors using the DirectRunner."
+ dependsOn integrationTest
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
index fe10a4e1d14..5f4027adce5 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java
@@ -309,10 +309,10 @@ public TopicPath apply(PubsubTopic from) {
/** Used to build a {@link ValueProvider} for {@link ProjectPath}. */
private static class ProjectPathTranslator
- implements SerializableFunction<PubsubTopic, ProjectPath> {
+ implements SerializableFunction<PubsubSubscription, ProjectPath> {
@Override
- public ProjectPath apply(PubsubTopic from) {
+ public ProjectPath apply(PubsubSubscription from) {
return PubsubClient.projectPathFromId(from.project);
}
}
@@ -691,11 +691,6 @@ public String toString() {
"Can't set both the topic and the subscription for " + "a
PubsubIO.Read transform");
}
- @Nullable
- ValueProvider<ProjectPath> projectPath =
- getTopicProvider() == null
- ? null
- : NestedValueProvider.of(getTopicProvider(), new
ProjectPathTranslator());
@Nullable
ValueProvider<TopicPath> topicPath =
getTopicProvider() == null
@@ -709,7 +704,7 @@ public String toString() {
PubsubUnboundedSource source =
new PubsubUnboundedSource(
FACTORY,
- projectPath,
+ null /* always get project from runtime PipelineOptions */,
topicPath,
subscriptionPath,
getTimestampAttribute(),
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
index 934f0b79ff4..c1fb325e1a8 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSource.java
@@ -51,6 +51,7 @@
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.ProjectPath;
@@ -1201,7 +1202,6 @@ public void populateDisplayData(Builder builder) {
checkArgument(
(topic == null) != (subscription == null),
"Exactly one of topic and subscription must be given");
- checkArgument((topic == null) == (project == null), "Project must be given
if topic is given");
this.clock = clock;
this.pubsubFactory = checkNotNull(pubsubFactory);
this.project = project;
@@ -1291,13 +1291,26 @@ public boolean getNeedsAttributes() {
}
private SubscriptionPath createRandomSubscription(PipelineOptions options) {
+ TopicPath topicPath = topic.get();
+
+ ProjectPath projectPath;
+ if (project != null) {
+ projectPath = project.get();
+ } else {
+ String projectId = options.as(GcpOptions.class).getProject();
+ checkState(
+ projectId != null,
+ "Cannot create subscription to topic %s because pipeline option
'project' not specified",
+ topicPath);
+ projectPath =
PubsubClient.projectPathFromId(options.as(GcpOptions.class).getProject());
+ }
+
try {
try (PubsubClient pubsubClient =
pubsubFactory.newClient(
timestampAttribute, idAttribute,
options.as(PubsubOptions.class))) {
SubscriptionPath subscriptionPath =
- pubsubClient.createRandomSubscription(
- project.get(), topic.get(), DEAULT_ACK_TIMEOUT_SEC);
+ pubsubClient.createRandomSubscription(projectPath, topicPath,
DEAULT_ACK_TIMEOUT_SEC);
LOG.warn(
"Created subscription {} to topic {}."
+ " Note this subscription WILL NOT be deleted when the
pipeline terminates",
@@ -1306,7 +1319,11 @@ private SubscriptionPath
createRandomSubscription(PipelineOptions options) {
return subscriptionPath;
}
} catch (Exception e) {
- throw new RuntimeException("Failed to create subscription: ", e);
+ throw new RuntimeException(
+ String.format(
+ "Failed to create subscription to topic %s on project %s: %s",
+ topicPath, projectPath, e.getMessage()),
+ e);
}
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
index 67c7c0e5746..f4d05f15c04 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsub.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
@@ -41,14 +42,15 @@
public class TestPubsub implements TestRule {
private static final DateTimeFormatter DATETIME_FORMAT =
DateTimeFormat.forPattern("YYYY-MM-dd-HH-mm-ss-SSS");
- private static final String TOPIC_FORMAT = "projects/%s/topics/%s";
+ private static final String EVENTS_TOPIC_NAME = "events";
private static final String TOPIC_PREFIX = "integ-test-";
private static final String NO_ID_ATTRIBUTE = null;
private static final String NO_TIMESTAMP_ATTRIBUTE = null;
- PubsubClient pubsub;
- private TestPubsubOptions pipelineOptions;
- private String eventsTopicPath;
+ private final TestPubsubOptions pipelineOptions;
+
+ private @Nullable PubsubClient pubsub = null;
+ private @Nullable TopicPath eventsTopicPath = null;
/**
* Creates an instance of this rule.
@@ -93,10 +95,11 @@ private void initializePubsub(Description description)
throws IOException {
pubsub =
PubsubGrpcClient.FACTORY.newClient(
NO_TIMESTAMP_ATTRIBUTE, NO_ID_ATTRIBUTE, pipelineOptions);
- String eventsTopicPathTmp =
- String.format(TOPIC_FORMAT, pipelineOptions.getProject(),
createTopicName(description));
+ TopicPath eventsTopicPathTmp =
+ PubsubClient.topicPathFromName(
+ pipelineOptions.getProject(), createTopicName(description,
EVENTS_TOPIC_NAME));
- pubsub.createTopic(new TopicPath(eventsTopicPathTmp));
+ pubsub.createTopic(eventsTopicPathTmp);
eventsTopicPath = eventsTopicPathTmp;
}
@@ -108,7 +111,7 @@ private void tearDown() throws IOException {
try {
if (eventsTopicPath != null) {
- pubsub.deleteTopic(new TopicPath(eventsTopicPath));
+ pubsub.deleteTopic(eventsTopicPath);
}
} finally {
pubsub.close();
@@ -122,7 +125,7 @@ private void tearDown() throws IOException {
*
* <p>Example:
'TestClassName-testMethodName-2018-12-11-23-32-333-<random-long>'
*/
- static String createTopicName(Description description) throws IOException {
+ static String createTopicName(Description description, String name) throws
IOException {
StringBuilder topicName = new StringBuilder(TOPIC_PREFIX);
if (description.getClassName() != null) {
@@ -139,11 +142,15 @@ static String createTopicName(Description description)
throws IOException {
DATETIME_FORMAT.printTo(topicName, Instant.now());
- return topicName.toString() + "-" +
String.valueOf(ThreadLocalRandom.current().nextLong());
+ return topicName.toString()
+ + "-"
+ + name
+ + "-"
+ + String.valueOf(ThreadLocalRandom.current().nextLong());
}
/** Topic path where events will be published to. */
- public String topicPath() {
+ public TopicPath topicPath() {
return eventsTopicPath;
}
@@ -151,7 +158,7 @@ public String topicPath() {
public void publish(List<PubsubMessage> messages) throws IOException {
List<PubsubClient.OutgoingMessage> outgoingMessages =
messages.stream().map(this::toOutgoingMessage).collect(toList());
- pubsub.publish(new TopicPath(eventsTopicPath), outgoingMessages);
+ pubsub.publish(eventsTopicPath, outgoingMessages);
}
private PubsubClient.OutgoingMessage toOutgoingMessage(PubsubMessage
message) {
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
index 4c34d868fa8..1558c56f08e 100644
---
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/TestPubsubSignal.java
@@ -17,24 +17,30 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;
+import static com.google.common.base.Preconditions.checkState;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.stream.Collectors.toList;
import static org.apache.beam.sdk.io.gcp.pubsub.TestPubsub.createTopicName;
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableSet;
+import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
+import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
-import org.apache.beam.sdk.state.SetState;
+import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
@@ -43,13 +49,17 @@
import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.POutput;
import org.joda.time.DateTime;
import org.joda.time.Duration;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Test rule which observes elements of the {@link PCollection} and checks
whether they match the
@@ -58,14 +68,19 @@
* <p>Uses a random temporary Pubsub topic for synchronization.
*/
public class TestPubsubSignal implements TestRule {
- private static final String TOPIC_FORMAT = "projects/%s/topics/%s-result1";
- private static final String SUBSCRIPTION_FORMAT =
"projects/%s/subscriptions/%s";
+ private static final Logger LOG =
LoggerFactory.getLogger(TestPubsubSignal.class);
+ private static final String RESULT_TOPIC_NAME = "result";
+ private static final String RESULT_SUCCESS_MESSAGE = "SUCCESS";
+ private static final String START_TOPIC_NAME = "start";
+ private static final String START_SIGNAL_MESSAGE = "START SIGNAL";
+
private static final String NO_ID_ATTRIBUTE = null;
private static final String NO_TIMESTAMP_ATTRIBUTE = null;
PubsubClient pubsub;
private TestPubsubOptions pipelineOptions;
- private String resultTopicPath;
+ private @Nullable TopicPath resultTopicPath = null;
+ private @Nullable TopicPath startTopicPath = null;
/**
* Creates an instance of this rule.
@@ -113,12 +128,19 @@ private void initializePubsub(Description description)
throws IOException {
// Example topic name:
//
integ-test-TestClassName-testMethodName-2018-12-11-23-32-333-<random-long>-result
- String resultTopicPathTmp =
- String.format(TOPIC_FORMAT, pipelineOptions.getProject(),
createTopicName(description));
+ TopicPath resultTopicPathTmp =
+ PubsubClient.topicPathFromName(
+ pipelineOptions.getProject(), createTopicName(description,
RESULT_TOPIC_NAME));
+ TopicPath startTopicPathTmp =
+ PubsubClient.topicPathFromName(
+ pipelineOptions.getProject(), createTopicName(description,
START_TOPIC_NAME));
- pubsub.createTopic(new TopicPath(resultTopicPathTmp));
+ pubsub.createTopic(resultTopicPathTmp);
+ pubsub.createTopic(startTopicPathTmp);
+ // Set these after successful creation; this signals that they need
teardown
resultTopicPath = resultTopicPathTmp;
+ startTopicPath = startTopicPathTmp;
}
private void tearDown() throws IOException {
@@ -128,7 +150,7 @@ private void tearDown() throws IOException {
try {
if (resultTopicPath != null) {
- pubsub.deleteTopic(new TopicPath(resultTopicPath));
+ pubsub.deleteTopic(resultTopicPath);
}
} finally {
pubsub.close();
@@ -137,6 +159,11 @@ private void tearDown() throws IOException {
}
}
+ /** Outputs a message that the pipeline has started. */
+ public PTransform<PBegin, PDone> signalStart() {
+ return new PublishStart(startTopicPath);
+ }
+
/**
* Outputs a success message when {@code successPredicate} is evaluated to
true.
*
@@ -152,56 +179,99 @@ private void tearDown() throws IOException {
* <p>If {@code successPredicate} throws, then failure will be signaled and
{@link
* #waitForSuccess(Duration)} will unblock.
*/
+ public <T> PTransform<PCollection<? extends T>, POutput> signalSuccessWhen(
+ Coder<T> coder,
+ SerializableFunction<T, String> formatter,
+ SerializableFunction<Set<T>, Boolean> successPredicate) {
+
+ return new PublishSuccessWhen<>(coder, formatter, successPredicate,
resultTopicPath);
+ }
+
+ /**
+ * Invocation of {@link #signalSuccessWhen(Coder, SerializableFunction,
SerializableFunction)}
+ * with {@link Object#toString} as the formatter.
+ */
public <T> PTransform<PCollection<? extends T>, POutput> signalSuccessWhen(
Coder<T> coder, SerializableFunction<Set<T>, Boolean> successPredicate) {
- return new PublishSuccessWhen<>(coder, successPredicate, resultTopicPath);
+ return signalSuccessWhen(coder, T::toString, successPredicate);
+ }
+
+ /**
+ * Future that waits for a start signal for {@code duration}.
+ *
+ * <p>This future must be created before running the pipeline. A
subscription must exist prior to
+ * the start signal being published, which occurs immediately upon pipeline
startup.
+ */
+ public Supplier<Void> waitForStart(Duration duration) throws IOException {
+ SubscriptionPath startSubscriptionPath =
+ PubsubClient.subscriptionPathFromName(
+ pipelineOptions.getProject(),
+ "start-subscription-" +
String.valueOf(ThreadLocalRandom.current().nextLong()));
+
+ pubsub.createSubscription(
+ startTopicPath, startSubscriptionPath, (int)
duration.getStandardSeconds());
+
+ return Suppliers.memoize(
+ () -> {
+ try {
+ String result = pollForResultForDuration(startSubscriptionPath,
duration);
+ checkState(START_SIGNAL_MESSAGE.equals(result));
+ return null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
}
/** Wait for a success signal for {@code duration}. */
public void waitForSuccess(Duration duration) throws IOException {
SubscriptionPath resultSubscriptionPath =
- new SubscriptionPath(
- String.format(
- SUBSCRIPTION_FORMAT,
- pipelineOptions.getProject(),
- "subscription-" +
String.valueOf(ThreadLocalRandom.current().nextLong())));
+ PubsubClient.subscriptionPathFromName(
+ pipelineOptions.getProject(),
+ "result-subscription-" +
String.valueOf(ThreadLocalRandom.current().nextLong()));
pubsub.createSubscription(
- new TopicPath(resultTopicPath),
- resultSubscriptionPath,
- (int) duration.getStandardSeconds());
+ resultTopicPath, resultSubscriptionPath, (int)
duration.getStandardSeconds());
String result = pollForResultForDuration(resultSubscriptionPath, duration);
- if (!"SUCCESS".equals(result)) {
+ if (!RESULT_SUCCESS_MESSAGE.equals(result)) {
throw new AssertionError(result);
}
}
private String pollForResultForDuration(
- SubscriptionPath resultSubscriptionPath, Duration duration) throws
IOException {
+ SubscriptionPath signalSubscriptionPath, Duration duration) throws
IOException {
- List<PubsubClient.IncomingMessage> result = null;
+ List<PubsubClient.IncomingMessage> signal = null;
DateTime endPolling = DateTime.now().plus(duration.getMillis());
do {
try {
- result = pubsub.pull(DateTime.now().getMillis(),
resultSubscriptionPath, 1, false);
+ signal = pubsub.pull(DateTime.now().getMillis(),
signalSubscriptionPath, 1, false);
pubsub.acknowledge(
- resultSubscriptionPath, result.stream().map(m ->
m.ackId).collect(toList()));
+ signalSubscriptionPath, signal.stream().map(m ->
m.ackId).collect(toList()));
break;
} catch (StatusRuntimeException e) {
- System.out.println("Error while polling for result: " + e.getStatus());
+ if (!Status.DEADLINE_EXCEEDED.equals(e.getStatus())) {
+ LOG.warn(
+ "(Will retry) Error while polling {} for signal: {}",
+ signalSubscriptionPath,
+ e.getStatus());
+ }
sleep(500);
}
} while (DateTime.now().isBefore(endPolling));
- if (result == null) {
- throw new AssertionError("Did not receive success in " +
duration.getStandardSeconds() + "s");
+ if (signal == null) {
+ throw new AssertionError(
+ String.format(
+ "Did not receive signal on %s in %ss",
+ signalSubscriptionPath, duration.getStandardSeconds()));
}
- return new String(result.get(0).elementBytes, UTF_8);
+ return new String(signal.get(0).elementBytes, UTF_8);
}
private void sleep(long t) {
@@ -212,18 +282,37 @@ private void sleep(long t) {
}
}
+ /** {@link PTransform} that signals once when the pipeline has started. */
+ static class PublishStart extends PTransform<PBegin, PDone> {
+ private final TopicPath startTopicPath;
+
+ PublishStart(TopicPath startTopicPath) {
+ this.startTopicPath = startTopicPath;
+ }
+
+ @Override
+ public PDone expand(PBegin input) {
+ return input
+ .apply("Start signal", Create.of(START_SIGNAL_MESSAGE))
+ .apply(PubsubIO.writeStrings().to(startTopicPath.getPath()));
+ }
+ }
+
/** {@link PTransform} that for validates whether elements seen so far match
success criteria. */
static class PublishSuccessWhen<T> extends PTransform<PCollection<? extends
T>, POutput> {
- private Coder<T> coder;
- private SerializableFunction<Set<T>, Boolean> successPredicate;
- private String resultTopicPath;
+ private final Coder<T> coder;
+ private final SerializableFunction<T, String> formatter;
+ private final SerializableFunction<Set<T>, Boolean> successPredicate;
+ private final TopicPath resultTopicPath;
PublishSuccessWhen(
Coder<T> coder,
+ SerializableFunction<T, String> formatter,
SerializableFunction<Set<T>, Boolean> successPredicate,
- String resultTopicPath) {
+ TopicPath resultTopicPath) {
this.coder = coder;
+ this.formatter = formatter;
this.successPredicate = successPredicate;
this.resultTopicPath = resultTopicPath;
}
@@ -237,9 +326,9 @@ public POutput expand(PCollection<? extends T> input) {
.apply(WithKeys.of("dummyKey"))
.apply(
"checkAllEventsForSuccess",
- ParDo.of(new StatefulPredicateCheck<>(coder, successPredicate)))
+ ParDo.of(new StatefulPredicateCheck<>(coder, formatter,
successPredicate)))
// signal the success/failure to the result topic
- .apply("publishSuccess",
PubsubIO.writeStrings().to(resultTopicPath));
+ .apply("publishSuccess",
PubsubIO.writeStrings().to(resultTopicPath.getPath()));
}
}
@@ -251,20 +340,27 @@ public POutput expand(PCollection<? extends T> input) {
* "FAILURE".
*/
static class StatefulPredicateCheck<T> extends DoFn<KV<String, ? extends T>,
String> {
+ private final SerializableFunction<T, String> formatter;
private SerializableFunction<Set<T>, Boolean> successPredicate;
// keep all events seen so far in the state cell
- @StateId("seenEvents")
- private final StateSpec<SetState<T>> seenEvents;
+ private static final String SEEN_EVENTS = "seenEvents";
+
+ @StateId(SEEN_EVENTS)
+ private final StateSpec<BagState<T>> seenEvents;
- StatefulPredicateCheck(Coder<T> coder, SerializableFunction<Set<T>,
Boolean> successPredicate) {
- this.seenEvents = StateSpecs.set(coder);
+ StatefulPredicateCheck(
+ Coder<T> coder,
+ SerializableFunction<T, String> formatter,
+ SerializableFunction<Set<T>, Boolean> successPredicate) {
+ this.seenEvents = StateSpecs.bag(coder);
+ this.formatter = formatter;
this.successPredicate = successPredicate;
}
@ProcessElement
public void processElement(
- ProcessContext context, @StateId("seenEvents") SetState<T> seenEvents)
{
+ ProcessContext context, @StateId(SEEN_EVENTS) BagState<T> seenEvents) {
seenEvents.add(context.element().getValue());
ImmutableSet<T> eventsSoFar = ImmutableSet.copyOf(seenEvents.read());
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java
new file mode 100644
index 00000000000..9f98deadd01
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubReadIT.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.pubsub;
+
+import com.google.common.base.Supplier;
+import org.apache.beam.runners.direct.DirectOptions;
+import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Integration test for PubsubIO. */
+@RunWith(JUnit4.class)
+public class PubsubReadIT {
+
+ @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
+ @Test
+ public void testReadPublicData() throws Exception {
+ // The pipeline will never terminate on its own
+ pipeline.getOptions().as(DirectOptions.class).setBlockOnRun(false);
+
+ PCollection<String> messages =
+ pipeline.apply(
+ PubsubIO.readStrings()
+
.fromTopic("projects/pubsub-public-data/topics/taxirides-realtime"));
+
+ messages.apply(
+ "waitForAnyMessage", signal.signalSuccessWhen(messages.getCoder(),
anyMessages -> true));
+
+ Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(5));
+ pipeline.apply(signal.signalStart());
+ PipelineResult job = pipeline.run();
+ start.get();
+
+ signal.waitForSuccess(Duration.standardSeconds(30));
+ // A runner may not support cancel
+ try {
+ job.cancel();
+ } catch (UnsupportedOperationException exc) {
+ // noop
+ }
+ }
+}
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 117745)
Time Spent: 5.5h (was: 5h 20m)
> PubsubIO: create subscription on different project than the topic
> -----------------------------------------------------------------
>
> Key: BEAM-4652
> URL: https://issues.apache.org/jira/browse/BEAM-4652
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp
> Reporter: Kenneth Knowles
> Assignee: Kenneth Knowles
> Priority: Critical
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> If you try to read a public pubsub topic in the DirectRunner, it will fail
> with 403 when trying to create a subscription. This is because it tries to
> create a subscription on the shared public data set.
> There is an example used in
> https://github.com/googlecodelabs/cloud-dataflow-nyc-taxi-tycoon and the
> dataset is {{projects/pubsub-public-data/topics/taxirides-realtime}}. I
> discovered that I could not read this in the DirectRunner even though the
> codelab works. But that 1.x codelab also does not work in the
> InProcessPipelineRunner, so it has been broken all along.
> So you cannot read public data or any other read-only data using PubsubIO.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)