This is an automated email from the ASF dual-hosted git repository.
chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 81d7cbe [BEAM-9779] Patch HL7v2IOWriteIT Flakiness (#11450)
81d7cbe is described below
commit 81d7cbe53364da65329069152d9d7a66b21df278
Author: Jacob Ferriero <[email protected]>
AuthorDate: Mon Apr 27 14:48:08 2020 -0700
[BEAM-9779] Patch HL7v2IOWriteIT Flakiness (#11450)
* Patches for HL7v2IO
* Use TestPipeline in ITs
* Drop schematized data before calling message ingest (should be output
only) to help pipelines that read/write from/to two HL7v2 stores
* Make HL7v2MessageCoder constructor public
* block on run
* add sleep to avoid flakiness due to asyncronous HL7v2 indexing
* E2E integration test
* fix merge issue
* reconcile double sleeping
* improve error hanlding
* improve error handling
* fix docs typo
* add latency distribution metrics
* remove unused imports
* ingest only data and labels
* fix comment
* call spliterator directly, use page size 1000
* output elements more eagerly in ListHL72MessageFn
* eagerly emit data from early pages
* Optimization of Listing and Stablization of ITs
* allow HL7v2 Message listing to emit early panes rather than waiting on
pagination of all list results
* add EBO on HL7v2 Message listing reaching a certain expected length in
ITs to account for async indexing BEAM-9779
* revert unrelated changes
* add back test
* Add constant for HL7v2 indexing timeout minutes
* Add constant for HL7v2 indexing timeout minutes
* fix checkstyle
---
.../beam/sdk/io/gcp/healthcare/HL7v2IOReadIT.java | 63 +-------------------
.../sdk/io/gcp/healthcare/HL7v2IOTestUtil.java | 67 ++++++++++++++++++----
.../beam/sdk/io/gcp/healthcare/HL7v2IOWriteIT.java | 33 +++++++----
3 files changed, 79 insertions(+), 84 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOReadIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOReadIT.java
index 1b41223..fb51b08 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOReadIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOReadIT.java
@@ -27,10 +27,8 @@ import static org.junit.Assert.assertFalse;
import java.io.IOException;
import java.security.SecureRandom;
-import java.util.Collections;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import
org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.ListHL7v2MessageIDs;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Count;
@@ -39,6 +37,7 @@ import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -54,6 +53,7 @@ public class HL7v2IOReadIT {
+ "_"
+ (new SecureRandom().nextInt(32))
+ "_read_it";
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
@BeforeClass
public static void createHL7v2tore() throws IOException {
@@ -87,36 +87,6 @@ public class HL7v2IOReadIT {
}
@Test
- public void testHL7v2IORead() throws Exception {
- // Should read all messages.
- Pipeline pipeline = Pipeline.create();
- HL7v2IO.Read.Result result =
- pipeline
- .apply(
- new ListHL7v2MessageIDs(
- Collections.singletonList(
- healthcareDataset + "/hl7V2Stores/" +
HL7V2_STORE_NAME)))
- .apply(HL7v2IO.getAll());
- PCollection<Long> numReadMessages =
- result.getMessages().setCoder(new
HL7v2MessageCoder()).apply(Count.globally());
- PAssert.thatSingleton(numReadMessages).isEqualTo((long) MESSAGES.size());
- PAssert.that(result.getFailedReads()).empty();
-
- PAssert.that(result.getMessages())
- .satisfies(
- input -> {
- for (HL7v2Message elem : input) {
- assertFalse(elem.getName().isEmpty());
- assertFalse(elem.getData().isEmpty());
- assertFalse(elem.getMessageType().isEmpty());
- }
- return null;
- });
-
- pipeline.run();
- }
-
- @Test
public void testHL7v2IO_ListHL7v2Messages() throws Exception {
// Should read all messages.
Pipeline pipeline = Pipeline.create();
@@ -164,33 +134,4 @@ public class HL7v2IOReadIT {
pipeline.run();
}
-
- @Test
- public void testHL7v2IORead_filtered() throws Exception {
- final String adtFilter = "messageType = \"ADT\"";
- // Should read only messages matching the filter.
- Pipeline pipeline = Pipeline.create();
- HL7v2IO.Read.Result result =
- pipeline
- .apply(
- new ListHL7v2MessageIDs(
- Collections.singletonList(
- healthcareDataset + "/hl7V2Stores/" +
HL7V2_STORE_NAME),
- adtFilter))
- .apply(HL7v2IO.getAll());
- PCollection<Long> numReadMessages =
- result.getMessages().setCoder(new
HL7v2MessageCoder()).apply(Count.globally());
- PAssert.thatSingleton(numReadMessages).isEqualTo(NUM_ADT);
- PAssert.that(result.getFailedReads()).empty();
-
- PAssert.that(result.getMessages())
- .satisfies(
- input -> {
- for (HL7v2Message elem : input) {
- assertEquals("ADT", elem.getMessageType());
- }
- return null;
- });
- pipeline.run();
- }
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
index c42d6b1..fc56280 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOTestUtil.java
@@ -18,19 +18,25 @@
package org.apache.beam.sdk.io.gcp.healthcare;
import com.google.api.client.util.Base64;
+import com.google.api.client.util.Sleeper;
import com.google.api.services.healthcare.v1beta1.model.Message;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
+import
org.apache.beam.sdk.io.gcp.healthcare.HttpHealthcareApiClient.HL7v2MessagePages;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
class HL7v2IOTestUtil {
+ public static final long HL7V2_INDEXING_TIMEOUT_MINUTES = 10L;
/** Google Cloud Healthcare Dataset in Apache Beam integration test project.
*/
public static final String HEALTHCARE_DATASET_TEMPLATE =
"projects/%s/locations/us-central1/datasets/apache-beam-integration-testing";
@@ -81,20 +87,58 @@ class HL7v2IOTestUtil {
/** Clear all messages from the HL7v2 store. */
static void deleteAllHL7v2Messages(HealthcareApiClient client, String
hl7v2Store)
throws IOException {
- for (String msgId :
- client
- .getHL7v2MessageStream(hl7v2Store)
- .map(HL7v2Message::getName)
- .collect(Collectors.toList())) {
- client.deleteHL7v2Message(msgId);
+ for (List<HL7v2Message> page : new HL7v2MessagePages(client, hl7v2Store)) {
+ for (String msgId :
page.stream().map(HL7v2Message::getName).collect(Collectors.toList())) {
+ client.deleteHL7v2Message(msgId);
+ }
+ }
+ }
+
+ /** Utiliy for waiting on HL7v2 Store indexing to be complete see BEAM-9779.
*/
+ public static void waitForHL7v2Indexing(
+ HealthcareApiClient client, String hl7v2Store, long expectedNumMessages,
Duration timeout)
+ throws InterruptedException, TimeoutException {
+
+ Instant start = Instant.now();
+ long sleepMs = 50;
+ long numListedMessages = 0;
+ while (new Duration(start, Instant.now()).isShorterThan(timeout)) {
+ numListedMessages = 0;
+ // count messages in HL7v2 Store.
+ for (List<HL7v2Message> page :
+ new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store)) {
+ numListedMessages += page.size();
+ }
+ if (numListedMessages == expectedNumMessages) {
+ return;
+ }
+ // exponential backoff.
+ sleepMs *= 2;
+ // exit if next sleep will violate timeout
+ if (new Duration(start,
Instant.now()).plus(sleepMs).isShorterThan(timeout)) {
+ Sleeper.DEFAULT.sleep(sleepMs);
+ } else {
+ throw new TimeoutException(
+ String.format(
+ "Timed out waiting for %s to reach %s messages. last list
request returned %s messages.",
+ hl7v2Store, expectedNumMessages, numListedMessages));
+ }
}
}
/** Populate the test messages into the HL7v2 store. */
- static void writeHL7v2Messages(HealthcareApiClient client, String
hl7v2Store) throws IOException {
+ static void writeHL7v2Messages(HealthcareApiClient client, String hl7v2Store)
+ throws IOException, InterruptedException, TimeoutException {
for (HL7v2Message msg : MESSAGES) {
client.createHL7v2Message(hl7v2Store, msg.toModel());
}
+ // [BEAM-9779] HL7v2 indexing is asyncronous. Block until indexing
completes to stabilize this
+ // IT.
+ HL7v2IOTestUtil.waitForHL7v2Indexing(
+ client,
+ hl7v2Store,
+ MESSAGES.size(),
+ Duration.standardMinutes(HL7V2_INDEXING_TIMEOUT_MINUTES));
}
/**
@@ -170,10 +214,11 @@ class HL7v2IOTestUtil {
public void listMessages(ProcessContext context) throws IOException {
String hl7v2Store = context.element();
// Output all elements of all pages.
- this.client
- .getHL7v2MessageStream(hl7v2Store, this.filter)
- .map(HL7v2Message::getName)
- .forEach(context::output);
+ HttpHealthcareApiClient.HL7v2MessagePages pages =
+ new HttpHealthcareApiClient.HL7v2MessagePages(client, hl7v2Store,
this.filter);
+ for (List<HL7v2Message> page : pages) {
+ page.stream().map(HL7v2Message::getName).forEach(context::output);
+ }
}
}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOWriteIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOWriteIT.java
index d261d96..f52d607 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOWriteIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IOWriteIT.java
@@ -18,22 +18,25 @@
package org.apache.beam.sdk.io.gcp.healthcare;
import static
org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HEALTHCARE_DATASET_TEMPLATE;
+import static
org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.HL7V2_INDEXING_TIMEOUT_MINUTES;
import static org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.MESSAGES;
import static
org.apache.beam.sdk.io.gcp.healthcare.HL7v2IOTestUtil.deleteAllHL7v2Messages;
-import static org.junit.Assert.assertEquals;
+import com.google.api.services.healthcare.v1beta1.model.Hl7V2Store;
import java.io.IOException;
import java.security.SecureRandom;
-import org.apache.beam.sdk.Pipeline;
+import java.util.concurrent.TimeoutException;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
-import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
+import org.joda.time.Duration;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -46,12 +49,15 @@ public class HL7v2IOWriteIT {
private static final String HL7V2_STORE_NAME =
"hl7v2_store_write_it_" + System.currentTimeMillis() + "_" + (new
SecureRandom().nextInt(32));
+ @Rule public transient TestPipeline pipeline = TestPipeline.create();
+
@BeforeClass
public static void createHL7v2tore() throws IOException {
String project =
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
healthcareDataset = String.format(HEALTHCARE_DATASET_TEMPLATE, project);
HealthcareApiClient client = new HttpHealthcareApiClient();
- client.createHL7v2Store(healthcareDataset, HL7V2_STORE_NAME);
+ Hl7V2Store store = client.createHL7v2Store(healthcareDataset,
HL7V2_STORE_NAME);
+ store.getParserConfig();
}
@AfterClass
@@ -65,7 +71,6 @@ public class HL7v2IOWriteIT {
if (client == null) {
client = new HttpHealthcareApiClient();
}
- PipelineOptions options =
TestPipeline.testingPipelineOptions().as(GcpOptions.class);
}
@After
@@ -74,8 +79,7 @@ public class HL7v2IOWriteIT {
}
@Test
- public void testHL7v2IOWrite() throws IOException {
- Pipeline pipeline = Pipeline.create();
+ public void testHL7v2IOWrite() throws Exception {
HL7v2IO.Write.Result result =
pipeline
.apply(Create.of(MESSAGES).withCoder(new HL7v2MessageCoder()))
@@ -84,10 +88,15 @@ public class HL7v2IOWriteIT {
PAssert.that(result.getFailedInsertsWithErr()).empty();
pipeline.run().waitUntilFinish();
- long numWrittenMessages =
- client
- .getHL7v2MessageStream(healthcareDataset + "/hl7V2Stores/" +
HL7V2_STORE_NAME)
- .count();
- assertEquals(MESSAGES.size(), numWrittenMessages);
+
+ try {
+ HL7v2IOTestUtil.waitForHL7v2Indexing(
+ client,
+ healthcareDataset + "/hl7V2Stores/" + HL7V2_STORE_NAME,
+ MESSAGES.size(),
+ Duration.standardMinutes(HL7V2_INDEXING_TIMEOUT_MINUTES));
+ } catch (TimeoutException e) {
+ Assert.fail(e.getMessage());
+ }
}
}