[ 
https://issues.apache.org/jira/browse/BEAM-5122?focusedWorklogId=134360&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-134360
 ]

ASF GitHub Bot logged work on BEAM-5122:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 14/Aug/18 00:14
            Start Date: 14/Aug/18 00:14
    Worklog Time Spent: 10m 
      Work Description: akedin closed pull request #6213: [BEAM-5122] Add extra 
synchronization to PubsubJsonIT.testUsesDlq
URL: https://github.com/apache/beam/pull/6213
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
index aa8cc079ecb..e0e16703c41 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/pubsub/PubsubJsonIT.java
@@ -89,7 +89,8 @@
 
   @Rule public transient TestPubsub eventsTopic = TestPubsub.create();
   @Rule public transient TestPubsub dlqTopic = TestPubsub.create();
-  @Rule public transient TestPubsubSignal signal = TestPubsubSignal.create();
+  @Rule public transient TestPubsubSignal resultSignal = 
TestPubsubSignal.create();
+  @Rule public transient TestPubsubSignal dlqSignal = 
TestPubsubSignal.create();
   @Rule public transient TestPipeline pipeline = TestPipeline.create();
 
   /**
@@ -119,18 +120,22 @@ public void testSelectsPayloadContent() throws Exception {
 
     String queryString = "SELECT message.payload.id, message.payload.name from 
message";
 
+    // Prepare messages to send later
     List<PubsubMessage> messages =
         ImmutableList.of(
             message(ts(1), 3, "foo"), message(ts(2), 5, "bar"), message(ts(3), 
7, "baz"));
 
+    // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubJsonTableProvider());
-
     sqlEnv.executeDdl(createTableString);
+
+    // Apply the PTransform to query the pubsub topic
     PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);
 
+    // Observe the query results and send success signal after seeing the 
expected messages
     queryOutput.apply(
         "waitForSuccess",
-        signal.signalSuccessWhen(
+        resultSignal.signalSuccessWhen(
             SchemaCoder.of(
                 PAYLOAD_SCHEMA, SerializableFunctions.identity(), 
SerializableFunctions.identity()),
             observedRows ->
@@ -140,13 +145,21 @@ public void testSelectsPayloadContent() throws Exception {
                         row(PAYLOAD_SCHEMA, 5, "bar"),
                         row(PAYLOAD_SCHEMA, 7, "baz")))));
 
-    Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(5));
-    pipeline.begin().apply(signal.signalStart());
+    // Send the start signal to make sure the signaling topic is initialized
+    Supplier<Void> start = 
resultSignal.waitForStart(Duration.standardMinutes(5));
+    pipeline.begin().apply(resultSignal.signalStart());
+
+    // Start the pipeline
     pipeline.run();
+
+    // Wait until got the start response from the signalling topic
     start.get();
 
+    // Start publishing the messages when main pipeline is started and 
signaling topic is ready
     eventsTopic.publish(messages);
-    signal.waitForSuccess(Duration.standardSeconds(60));
+
+    // Poll the signaling topic for success message
+    resultSignal.waitForSuccess(Duration.standardSeconds(60));
   }
 
   @Test
@@ -174,36 +187,69 @@ public void testUsesDlq() throws Exception {
 
     String queryString = "SELECT message.payload.id, message.payload.name from 
message";
 
+    // Prepare messages to send later
     List<PubsubMessage> messages =
         ImmutableList.of(
             message(ts(1), 3, "foo"),
             message(ts(2), 5, "bar"),
             message(ts(3), 7, "baz"),
-            message(ts(4), "{ - }"), // invalid
-            message(ts(5), "{ + }")); // invalid
+            message(ts(4), "{ - }"), // invalid message, will go to DLQ
+            message(ts(5), "{ + }")); // invalid message, will go to DLQ
 
+    // Initialize SQL environment and create the pubsub table
     BeamSqlEnv sqlEnv = BeamSqlEnv.inMemory(new PubsubJsonTableProvider());
-
     sqlEnv.executeDdl(createTableString);
-    query(sqlEnv, pipeline, queryString);
+
+    // Apply the PTransform to query the pubsub topic
+    PCollection<Row> queryOutput = query(sqlEnv, pipeline, queryString);
+
+    // Observe the query results and send success signal after seeing the 
expected messages
+    queryOutput.apply(
+        "waitForSuccess",
+        resultSignal.signalSuccessWhen(
+            SchemaCoder.of(
+                PAYLOAD_SCHEMA, SerializableFunctions.identity(), 
SerializableFunctions.identity()),
+            observedRows ->
+                observedRows.equals(
+                    ImmutableSet.of(
+                        row(PAYLOAD_SCHEMA, 3, "foo"),
+                        row(PAYLOAD_SCHEMA, 5, "bar"),
+                        row(PAYLOAD_SCHEMA, 7, "baz")))));
+
+    // Send the start signal to make sure the signaling topic is initialized
+    Supplier<Void> start = 
resultSignal.waitForStart(Duration.standardMinutes(5));
+    pipeline.begin().apply("signal query results started", 
resultSignal.signalStart());
+
+    // Another PCollection, reads from DLQ
     PCollection<PubsubMessage> dlq =
         pipeline.apply(
             
PubsubIO.readMessagesWithAttributes().fromTopic(dlqTopic.topicPath().getPath()));
 
+    // Observe DLQ contents and send success signal after seeing the expected 
messages
     dlq.apply(
         "waitForDlq",
-        signal.signalSuccessWhen(
+        dlqSignal.signalSuccessWhen(
             PubsubMessageWithAttributesCoder.of(),
             dlqMessages ->
                 containsAll(dlqMessages, message(ts(4), "{ - }"), 
message(ts(5), "{ + }"))));
 
-    Supplier<Void> start = signal.waitForStart(Duration.standardMinutes(5));
-    pipeline.begin().apply(signal.signalStart());
+    // Send the start signal to make sure the signaling topic is initialized
+    Supplier<Void> startDlq = 
dlqSignal.waitForStart(Duration.standardMinutes(5));
+    pipeline.begin().apply("signal DLQ started", dlqSignal.signalStart());
+
+    // Start the pipeline
     pipeline.run();
+
+    // Wait until got the response from the signalling topics
     start.get();
+    startDlq.get();
 
+    // Start publishing the messages when main pipeline is started and 
signaling topics are ready
     eventsTopic.publish(messages);
-    signal.waitForSuccess(Duration.standardSeconds(60));
+
+    // Poll the signaling topic for success message
+    resultSignal.waitForSuccess(Duration.standardMinutes(2));
+    dlqSignal.waitForSuccess(Duration.standardMinutes(2));
   }
 
   @Test
@@ -239,7 +285,8 @@ public void testSQLLimit() throws Exception {
             message(ts(6), 13, "ba4"),
             message(ts(7), 15, "ba5"));
 
-    // We need the default options on the schema to include the project passed 
in for the integration test
+    // We need the default options on the schema to include the project passed 
in for the
+    // integration test
     CalciteConnection connection = connect(pipeline.getOptions(), new 
PubsubJsonTableProvider());
 
     Statement statement = connection.createStatement();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 134360)
    Time Spent: 6.5h  (was: 6h 20m)

> beam_PostCommit_Java_GradleBuild 
> org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonIT.testUsesDlq
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-5122
>                 URL: https://issues.apache.org/jira/browse/BEAM-5122
>             Project: Beam
>          Issue Type: Bug
>          Components: test-failures
>            Reporter: Mikhail Gryzykhin
>            Assignee: Anton Kedin
>            Priority: Critical
>          Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> [https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/1216/testReport/junit/org.apache.beam.sdk.extensions.sql.meta.provider.pubsub/PubsubJsonIT/testUsesDlq/history/]
> Test flakes with timeout of getting update on pubsub:
> java.lang.AssertionError: Did not receive signal on 
> projects/apache-beam-testing/subscriptions/result-subscription--6677803195159868432
>  in 60s at 
> org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal.pollForResultForDuration(TestPubsubSignal.java:269)
>  at 
> org.apache.beam.sdk.io.gcp.pubsub.TestPubsubSignal.waitForSuccess(TestPubsubSignal.java:237)
>  at 
> org.apache.beam.sdk.extensions.sql.meta.provider.pubsub.PubsubJsonIT.testUsesDlq(PubsubJsonIT.java:206)
> [https://builds.apache.org/job/beam_PostCommit_Java_GradleBuild/1216/testReport/org.apache.beam.sdk.extensions.sql.meta.provider.pubsub/PubsubJsonIT/testUsesDlq/]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to