[
https://issues.apache.org/jira/browse/BEAM-9273?focusedWorklogId=387694&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-387694
]
ASF GitHub Bot logged work on BEAM-9273:
----------------------------------------
Author: ASF GitHub Bot
Created on: 14/Feb/20 21:39
Start Date: 14/Feb/20 21:39
Worklog Time Spent: 10m
Work Description: je-ik commented on pull request #10816: [BEAM-9273]
Explicitly disable @RequiresTimeSortedInput on unsupported runners
URL: https://github.com/apache/beam/pull/10816#discussion_r379656927
##########
File path:
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/reflect/DoFnSignaturesTest.java
##########
@@ -1192,6 +1199,239 @@ public void bar(
assertThat(params.get(4), instanceOf(TaggedOutputReceiverParameter.class));
}
+ private interface FeatureTest {
+ void test();
+ }
+
+ private static class StatelessDoFn extends DoFn<String, String> implements
FeatureTest {
+ @ProcessElement
+ public void process(@Element String input) {}
+
+ @Override
+ public void test() {
+ assertThat(DoFnSignatures.isSplittable(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.isStateful(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesTimers(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesBagState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesMapState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesSetState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesValueState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesWatermarkHold(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.requiresTimeSortedInput(this),
SerializableMatchers.equalTo(false));
+ }
+ }
+
+ private static class StatefulWithValueState extends DoFn<KV<String, String>,
String>
+ implements FeatureTest {
+ @StateId("state")
+ private final StateSpec<ValueState<String>> state = StateSpecs.value();
+
+ @ProcessElement
+ public void process(@Element KV<String, String> input) {}
+
+ @Override
+ public void test() {
+ assertThat(DoFnSignatures.isSplittable(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.isStateful(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesTimers(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesBagState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesMapState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesSetState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesValueState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesWatermarkHold(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.requiresTimeSortedInput(this),
SerializableMatchers.equalTo(false));
+ }
+ }
+
+ private static class StatefulWithTimers extends DoFn<KV<String, String>,
String>
+ implements FeatureTest {
+ @TimerId("timer")
+ private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @ProcessElement
+ public void process(@Element KV<String, String> input) {}
+
+ @Override
+ public void test() {
+ assertThat(DoFnSignatures.isSplittable(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.isStateful(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesTimers(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesBagState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesMapState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesSetState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesValueState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesWatermarkHold(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.requiresTimeSortedInput(this),
SerializableMatchers.equalTo(false));
+ }
+
+ @OnTimer("timer")
+ public void onTimer() {}
+ }
+
+ private static class StatefulWithTimersAndValueState extends DoFn<KV<String,
String>, String>
+ implements FeatureTest {
+ @TimerId("timer")
+ private final TimerSpec timer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @StateId("state")
+ private final StateSpec<SetState<String>> state = StateSpecs.set();
+
+ @ProcessElement
+ public void process(@Element KV<String, String> input) {}
+
+ @Override
+ public void test() {
+ assertThat(DoFnSignatures.isSplittable(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.isStateful(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesTimers(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesBagState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesMapState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesSetState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesValueState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesWatermarkHold(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.requiresTimeSortedInput(this),
SerializableMatchers.equalTo(false));
+ }
+
+ @OnTimer("timer")
+ public void onTimer() {}
+ }
+
+ private static class StatefulWithSetState extends DoFn<KV<String, String>,
String>
+ implements FeatureTest {
+ @StateId("state")
+ private final StateSpec<SetState<String>> spec = StateSpecs.set();
+
+ @ProcessElement
+ public void process(@Element KV<String, String> input) {}
+
+ @Override
+ public void test() {
+ assertThat(DoFnSignatures.isSplittable(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.isStateful(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesTimers(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesBagState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesMapState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesSetState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesValueState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesWatermarkHold(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.requiresTimeSortedInput(this),
SerializableMatchers.equalTo(false));
+ }
+ }
+
+ private static class StatefulWithMapState extends DoFn<KV<String, String>,
String>
+ implements FeatureTest {
+ @StateId("state")
+ private final StateSpec<MapState<String, String>> spec = StateSpecs.map();
+
+ @ProcessElement
+ public void process(@Element KV<String, String> input) {}
+
+ @Override
+ public void test() {
+ assertThat(DoFnSignatures.isSplittable(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.isStateful(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesTimers(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesBagState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesMapState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesSetState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesValueState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesWatermarkHold(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.requiresTimeSortedInput(this),
SerializableMatchers.equalTo(false));
+ }
+ }
+
+ private static class StatefulWithWatermarkHoldState extends DoFn<KV<String,
String>, String>
+ implements FeatureTest {
+ @StateId("state")
+ private final StateSpec<WatermarkHoldState> spec =
+ StateSpecs.watermarkStateInternal(TimestampCombiner.LATEST);
+
+ @ProcessElement
+ public void process(@Element KV<String, String> input) {}
+
+ @Override
+ public void test() {
+ assertThat(DoFnSignatures.isSplittable(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.isStateful(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesTimers(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesBagState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesMapState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesSetState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesValueState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesWatermarkHold(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.requiresTimeSortedInput(this),
SerializableMatchers.equalTo(false));
+ }
+ }
+
+ private static class RequiresTimeSortedInput extends DoFn<KV<String,
String>, String>
+ implements FeatureTest {
+ @ProcessElement
+ @RequiresTimeSortedInput
+ public void process(@Element KV<String, String> input) {}
+
+ @Override
+ public void test() {
+ assertThat(DoFnSignatures.isSplittable(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.isStateful(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesTimers(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesBagState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesMapState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesSetState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesValueState(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.usesWatermarkHold(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.requiresTimeSortedInput(this),
SerializableMatchers.equalTo(true));
+ }
+ }
+
+ private static class Splittable extends DoFn<KV<String, Long>, String>
implements FeatureTest {
+ @ProcessElement
+ public void process(ProcessContext c, RestrictionTracker<OffsetRange, ?>
tracker) {}
+
+ @GetInitialRestriction
+ public OffsetRange getInitialRange(@Element KV<String, Long> element) {
+ return new OffsetRange(0L, element.getValue());
+ }
+
+ @Override
+ public void test() {
+ assertThat(DoFnSignatures.isSplittable(this),
SerializableMatchers.equalTo(true));
+ assertThat(DoFnSignatures.isStateful(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesTimers(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesBagState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesMapState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesSetState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesValueState(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.usesWatermarkHold(this),
SerializableMatchers.equalTo(false));
+ assertThat(DoFnSignatures.requiresTimeSortedInput(this),
SerializableMatchers.equalTo(false));
+ }
+ }
+
+ private final List<FeatureTest> tests =
+ Lists.newArrayList(
+ new StatelessDoFn(),
+ new StatefulWithValueState(),
+ new StatefulWithTimers(),
+ new StatefulWithTimersAndValueState(),
+ new StatefulWithSetState(),
+ new StatefulWithMapState(),
+ new StatefulWithWatermarkHoldState(),
+ new RequiresTimeSortedInput(),
+ new Splittable());
+
+ @Test
+ public void testAllDoFnFeatures() {
+ tests.forEach(FeatureTest::test);
Review comment:
I wanted to keep the declaration of a class and its features encapsulated in
single class. Agree that this doesn't play well with the rest of the
`DoFnSignaturesTest`. But because there is effort in the direction of
superseding all this with "pipeline features", I think it is fine to keep is as
is for now and drop it as part of the effort later.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 387694)
Time Spent: 4h 40m (was: 4.5h)
> Explicitly fail pipeline with @RequiresTimeSortedInput with unsupported runner
> ------------------------------------------------------------------------------
>
> Key: BEAM-9273
> URL: https://issues.apache.org/jira/browse/BEAM-9273
> Project: Beam
> Issue Type: Improvement
> Components: sdk-java-core
> Reporter: Jan Lukavský
> Assignee: Jan Lukavský
> Priority: Major
> Fix For: 2.20.0
>
> Time Spent: 4h 40m
> Remaining Estimate: 0h
>
> Fail pipeline with @RequiresTimeSortedInput annotation in pipeline
> translation time when being run with unsupported runner. Currently,
> unsupported runners are:
> - apex
> - portable flink
> - gearpump
> - dataflow
> - jet
> - samza
> - spark structured streaming
> These runners should reject the pipeline.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)