[
https://issues.apache.org/jira/browse/BEAM-5122?focusedWorklogId=134360&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134360
]
ASF GitHub Bot logged work on BEAM-5122:
----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Aug/18 00:14
Start Date: 14/Aug/18 00:14
Worklog Time Spent: 10m
Work Description: akedin closed pull request #6213: [BEAM-5122] Add extra
synchronization to PubsubJsonIT.testUsesDlq
URL: https://github.com/apache/beam/pull/6213
This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:
As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index aa8cc079ecb..e0e16703c41 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -89,7 +89,8 @@
@Rule public transient TestPubsub eventsTopic = TestPubsub.create();
@Rule public transient TestPubsub dlqTopic = TestPubsub.create();
- @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
+ @Rule public transient TestPubsubSignal resultSignal =
TestPubsubSignal.create();
+ @Rule public transient TestPubsubSignal dlqSignal =
TestPubsubSignal.create();
@Rule public transient TestPipeline pipeline = TestPipeline.create();
/**
@@ -119,18 +120,22 @@ public void testSelectsPayloadContent() throws Exception {
String queryString = "SELECT message.payload.id, message.payload.name from
message";
+ // Prepare messages to send later
List<PubsubMessage> messages =
ImmutableList.of(
message(ts(1), 3, "foo"), message(ts(2), 5, "bar"), message(ts(3),
7, "baz"));
+ // Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubJsonTableProvider());
-
sqlEnv.executeDdl(createTableString);
+
+ // Apply the PTransform to query the pubsub topic
PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);
+ // Observe the query results and send success signal after seeing the
expected messages
queryOutput.apply(
"waitForSuccess",
- signal.signalSuccessWhen(
+ resultSignal.signalSuccessWhen(
SchemaCoder.of(
PAYLOAD_SCHEMA, SerializableFunctions.identity(),
SerializableFunctions.identity()),
observedRows ->
@@ -140,13 +145,21 @@ public void testSelectsPayloadContent() throws Exception {
row(PAYLOAD_SCHEMA, 5, "bar"),
row(PAYLOAD_SCHEMA, 7, "baz")))));
- Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(5));
- pipeline.begin().apply(signal.signalStart());
+ // Send the start signal to make sure the signaling topic is initialized
+ Supplier<Void> start =
resultSignal.waitForStart(Duration.standardMinutes(5));
+ pipeline.begin().apply(resultSignal.signalStart());
+
+ // Start the pipeline
pipeline.run();
+
+ // Wait until got the start response from the signalling topic
start.get();
+ // Start publishing the messages when main pipeline is started and
signaling topic is ready
eventsTopic.publish(messages);
- signal.waitForSuccess(Duration.standardSeconds(60));
+
+ // Poll the signaling topic for success message
+ resultSignal.waitForSuccess(Duration.standardSeconds(60));
}
@Test
@@ -174,36 +187,69 @@ public void testUsesDlq() throws Exception {
String queryString = "SELECT message.payload.id, message.payload.name from
message";
+ // Prepare messages to send later
List<PubsubMessage> messages =
ImmutableList.of(
message(ts(1), 3, "foo"),
message(ts(2), 5, "bar"),
message(ts(3), 7, "baz"),
- message(ts(4), "{ - }"), // invalid
- message(ts(5), "{ + }")); // invalid
+ message(ts(4), "{ - }"), // invalid message, will go to DLQ
+ message(ts(5), "{ + }")); // invalid message, will go to DLQ
+ // Initialize SQL environment and create the pubsub table
BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubJsonTableProvider());
-
sqlEnv.executeDdl(createTableString);
- query(sqlEnv, pipeline, queryString);
+
+ // Apply the PTransform to query the pubsub topic
+ PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);
+
+ // Observe the query results and send success signal after seeing the
expected messages
+ queryOutput.apply(
+ "waitForSuccess",
+ resultSignal.signalSuccessWhen(
+ SchemaCoder.of(
+ PAYLOAD_SCHEMA, SerializableFunctions.identity(),
SerializableFunctions.identity()),
+ observedRows ->
+ observedRows.equals(
+ ImmutableSet.of(
+ row(PAYLOAD_SCHEMA, 3, "foo"),
+ row(PAYLOAD_SCHEMA, 5, "bar"),
+ row(PAYLOAD_SCHEMA, 7, "baz")))));
+
+ // Send the start signal to make sure the signaling topic is initialized
+ Supplier<Void> start =
resultSignal.waitForStart(Duration.standardMinutes(5));
+ pipeline.begin().apply("signal query results started",
resultSignal.signalStart());
+
+ // Another PCollection, reads from DLQ
PCollection<PubsubMessage> dlq =
pipeline.apply(
PubsubIO.readMessagesWithAttributes().fromTopic(dlqTopic.topicPath().getPath()));
+ // Observe DLQ contents and send success signal after seeing the expected
messages
dlq.apply(
"waitForDlq",
- signal.signalSuccessWhen(
+ dlqSignal.signalSuccessWhen(
PubsubMessageWithAttributesCoder.of(),
dlqMessages ->
containsAll(dlqMessages, message(ts(4), "{ - }"),
message(ts(5), "{ + }"))));
- Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(5));
- pipeline.begin().apply(signal.signalStart());
+ // Send the start signal to make sure the signaling topic is initialized
+ Supplier<Void> startDlq =
dlqSignal.waitForStart(Duration.standardMinutes(5));
+ pipeline.begin().apply("signal DLQ started", dlqSignal.signalStart());
+
+ // Start the pipeline
pipeline.run();
+
+ // Wait until got the response from the signalling topics
start.get();
+ startDlq.get();
+ // Start publishing the messages when main pipeline is started and
signaling topics are ready
eventsTopic.publish(messages);
- signal.waitForSuccess(Duration.standardSeconds(60));
+
+ // Poll the signaling topic for success message
+ resultSignal.waitForSuccess(Duration.standardMinutes(2));
+ dlqSignal.waitForSuccess(Duration.standardMinutes(2));
}
@Test
@@ -239,7 +285,8 @@ public void testSQLLimit() throws Exception {
message(ts(6), 13, "ba4"),
message(ts(7), 15, "ba5"));
- // We need the default options on the schema to include the project passed
in for the integration test
+ // We need the default options on the schema to include the project passed
in for the
+ // integration test
CalciteConnection connection = connect(pipeline.getOptions(), new
PubsubJsonTableProvider());
Statement statement = connection.createStatement();
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 134360)
Time Spent: 6.5h (was: 6h 20m)
> beam_PostCommit_Java_GradleBuild
> org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonIT.testUsesDlq
> -----------------------------------------------------------------------------------------------------------------
>
> Key: BEAM-5122
> URL: https://issues.apache.org/jira/browse/BEAM-5122
> Project: Beam
> Issue Type: Bug
> Components: test-failures
> Reporter: Mikhail Gryzykhin
> Assignee: Anton Kedin
> Priority: Critical
> Time Spent: 6.5h
> Remaining Estimate: 0h
>
> [https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/1216/testReport/junit/org.apache.beam.sdk.extensions.sql.meta.provider.pubsub/PubsubJsonIT/testUsesDlq/history/]
> Test flakes with timeout of getting update on pubsub:
> java.lang.AssertionError: Did not receive signal on
> projects/apache-beam-testing/subscriptions/result-subscription--6677803195159868432
> in 60s at
> org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal.pollForResultForDuration(TestPubsubSignal.java:269)
> at
> org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal.waitForSuccess(TestPubsubSignal.java:237)
> at
> org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonIT.testUsesDlq(PubsubJsonIT.java:206)
> [https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/1216/testReport/org.apache.beam.sdk.extensions.sql.meta.provider.pubsub/PubsubJsonIT/testUsesDlq/]
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)