derrickaw commented on code in PR #39174:
URL: https://github.com/apache/beam/pull/39174#discussion_r3500496918
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java:
##########
@@ -271,9 +271,47 @@ public void processElement(ProcessContext context,
@StateId("count") ValueState<
private final String watchPathStr;
}
+ private static class AfterNumberOfNewOutputs
+ implements Watch.Growth.TerminationCondition<String, Integer> {
+ private final int numOutputs;
+
+ public AfterNumberOfNewOutputs(int numOutputs) {
+ this.numOutputs = numOutputs;
+ }
+
+ @Override
+ public org.apache.beam.sdk.coders.Coder<Integer> getStateCoder() {
+ return VarIntCoder.of();
+ }
+
+ @Override
+ public Integer forNewInput(org.joda.time.Instant now, String input) {
+ return 0;
+ }
+
+ @Override
+ public Integer onSeenNewOutput(org.joda.time.Instant now, Integer state) {
+ return state + 1;
+ }
+
+ @Override
+ public boolean canStopPolling(org.joda.time.Instant now, Integer state) {
+ return state >= numOutputs;
+ }
Review Comment:
done
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileIOTest.java:
##########
@@ -271,9 +271,47 @@ public void processElement(ProcessContext context,
@StateId("count") ValueState<
private final String watchPathStr;
}
+ private static class AfterNumberOfNewOutputs
+ implements Watch.Growth.TerminationCondition<String, Integer> {
+ private final int numOutputs;
+
+ public AfterNumberOfNewOutputs(int numOutputs) {
+ this.numOutputs = numOutputs;
+ }
+
+ @Override
+ public org.apache.beam.sdk.coders.Coder<Integer> getStateCoder() {
+ return VarIntCoder.of();
+ }
+
+ @Override
+ public Integer forNewInput(org.joda.time.Instant now, String input) {
+ return 0;
+ }
+
+ @Override
+ public Integer onSeenNewOutput(org.joda.time.Instant now, Integer state) {
+ return state + 1;
+ }
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]