lukecwik commented on a change in pull request #11862:
URL: https://github.com/apache/beam/pull/11862#discussion_r442552310
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -452,9 +454,16 @@ private Message fetchMessage(HealthcareApiClient client,
String msgId)
* with {@link ListHL7v2Messages#withInitialSplitDuration(Duration)}
*/
public static class ListHL7v2Messages extends PTransform<PBegin,
PCollection<HL7v2Message>> {
+ private static final TimestampMethod DEFAULT_TIMESTAMP_METHOD =
TimestampMethod.SEND_TIME;
private final ValueProvider<List<String>> hl7v2Stores;
- private final ValueProvider<String> filter;
+ private ValueProvider<String> filter;
private Duration initialSplitDuration;
+ private TimestampMethod timestampMethod;
+
+ enum TimestampMethod {
Review comment:
comment?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -466,21 +475,63 @@ private Message fetchMessage(HealthcareApiClient client,
String msgId)
this.hl7v2Stores = hl7v2Stores;
this.filter = filter;
this.initialSplitDuration = null;
+ this.timestampMethod = DEFAULT_TIMESTAMP_METHOD;
}
- public ListHL7v2Messages withInitialSplitDuration(Duration
initialSplitDuration) {
+ /**
+ * Controls the initial splitting for parallelization of HL7v2 Messages
List requests based on
+ * disjoint sendTime filters of the specified duration.
+ *
+ * @param initialSplitDuration the initial split duration
+ * @return the list hl 7 v 2 messages
+ */
+ ListHL7v2Messages withInitialSplitDuration(Duration initialSplitDuration) {
this.initialSplitDuration = initialSplitDuration;
Review comment:
Please return a new version of this transform instead of mutating the
existing version.
It is not uncommon for users to take one transform apply it to the graph
once and then call withX on it and apply it elsewhere. We wouldn't want the
second mutation to ever impact the first application of the transform and it is
easy to guard against it using the @AutoValue builders.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -596,9 +664,14 @@ public void listMessages(
lastClaimedMilliSecond = cursor.getMillis();
}
- outputReceiver.output(msg);
+ switch (timestampMethod) {
+ case SEND_TIME:
+ outputReceiver.outputWithTimestamp(msg,
Instant.parse(msg.getSendTime()));
+ break;
+ case INPUT_ELEMENT:
+ outputReceiver.outputWithTimestamp(msg, context.timestamp());
Review comment:
You should use output(msg) and not outputWithTimestamp since output() by
default does this. This also allows you to not have to take a ProcessContext
parameter.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -466,21 +475,63 @@ private Message fetchMessage(HealthcareApiClient client,
String msgId)
this.hl7v2Stores = hl7v2Stores;
this.filter = filter;
this.initialSplitDuration = null;
+ this.timestampMethod = DEFAULT_TIMESTAMP_METHOD;
}
- public ListHL7v2Messages withInitialSplitDuration(Duration
initialSplitDuration) {
+ /**
+ * Controls the initial splitting for parallelization of HL7v2 Messages
List requests based on
+ * disjoint sendTime filters of the specified duration.
+ *
+ * @param initialSplitDuration the initial split duration
+ * @return the list hl 7 v 2 messages
Review comment:
?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -466,21 +475,63 @@ private Message fetchMessage(HealthcareApiClient client,
String msgId)
this.hl7v2Stores = hl7v2Stores;
this.filter = filter;
this.initialSplitDuration = null;
+ this.timestampMethod = DEFAULT_TIMESTAMP_METHOD;
}
- public ListHL7v2Messages withInitialSplitDuration(Duration
initialSplitDuration) {
+ /**
+ * Controls the initial splitting for parallelization of HL7v2 Messages
List requests based on
+ * disjoint sendTime filters of the specified duration.
+ *
+ * @param initialSplitDuration the initial split duration
+ * @return the list hl 7 v 2 messages
+ */
+ ListHL7v2Messages withInitialSplitDuration(Duration initialSplitDuration) {
this.initialSplitDuration = initialSplitDuration;
return this;
}
+ /**
+ * Controls if the output elements will be assigned a timestamp beased on
the sendTime property
+ * or the input element's timestamp.
+ *
+ * @param timestampMethod the timestamp method
+ * @return the list hl 7 v 2 messages
+ */
+ ListHL7v2Messages withTimestampMethod(TimestampMethod timestampMethod) {
Review comment:
How about `withOutputTimestampMethod`, also check that the argument is
non-null?
nit: I'm torn between using an enum or just having the two variants between
methods. Your call. Having two variants prevents the user from passing in null
here.
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -509,13 +561,17 @@ public ListHL7v2Messages
withInitialSplitDuration(Duration initialSplitDuration)
* @param filter the filter
*/
ListHL7v2MessagesFn(String filter) {
- this(StaticValueProvider.of(filter), null);
+ this(StaticValueProvider.of(filter), null, null);
Review comment:
We pass in null to the constructor and don't handle that case in the
switch statement. How about passing in the default?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -466,21 +475,63 @@ private Message fetchMessage(HealthcareApiClient client,
String msgId)
this.hl7v2Stores = hl7v2Stores;
this.filter = filter;
this.initialSplitDuration = null;
+ this.timestampMethod = DEFAULT_TIMESTAMP_METHOD;
}
- public ListHL7v2Messages withInitialSplitDuration(Duration
initialSplitDuration) {
+ /**
+ * Controls the initial splitting for parallelization of HL7v2 Messages
List requests based on
+ * disjoint sendTime filters of the specified duration.
+ *
+ * @param initialSplitDuration the initial split duration
+ * @return the list hl 7 v 2 messages
+ */
+ ListHL7v2Messages withInitialSplitDuration(Duration initialSplitDuration) {
this.initialSplitDuration = initialSplitDuration;
return this;
}
+ /**
+ * Controls if the output elements will be assigned a timestamp beased on
the sendTime property
+ * or the input element's timestamp.
+ *
+ * @param timestampMethod the timestamp method
+ * @return the list hl 7 v 2 messages
Review comment:
?
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -90,9 +91,9 @@
* is mainly to catch scenarios where the upstream {@link PCollection}
contains IDs that are not
* valid or are not reachable due to permissions issues.
*
- * <p>Message Listing Message Listing with {@link HL7v2IO.ListHL7v2Messages}
supports batch use
- * cases where you want to process all the messages in an HL7v2 store or those
matching a
- * filter @see <a
+ * <p>Message Listing Message Listing with {@link ListHL7v2Messages} and
{@link ListHL7v2Messages}
Review comment:
?
```suggestion
* <p>Message Listing with {@link ListHL7v2Messages} and {@link
ListHL7v2Messages}
```
##########
File path:
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/healthcare/HL7v2IO.java
##########
@@ -466,21 +475,63 @@ private Message fetchMessage(HealthcareApiClient client,
String msgId)
this.hl7v2Stores = hl7v2Stores;
this.filter = filter;
this.initialSplitDuration = null;
+ this.timestampMethod = DEFAULT_TIMESTAMP_METHOD;
}
- public ListHL7v2Messages withInitialSplitDuration(Duration
initialSplitDuration) {
+ /**
+ * Controls the initial splitting for parallelization of HL7v2 Messages
List requests based on
+ * disjoint sendTime filters of the specified duration.
+ *
+ * @param initialSplitDuration the initial split duration
+ * @return the list hl 7 v 2 messages
+ */
+ ListHL7v2Messages withInitialSplitDuration(Duration initialSplitDuration) {
this.initialSplitDuration = initialSplitDuration;
return this;
}
+ /**
+ * Controls if the output elements will be assigned a timestamp beased on
the sendTime property
+ * or the input element's timestamp.
Review comment:
```suggestion
* Controls if the output elements will be assigned a timestamp beased
on the {@code sendTime} property
* or the input element's timestamp.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]