[ https://issues.apache.org/jira/browse/BEAM-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542126#comment-16542126 ]
Tanner Cecchetti edited comment on BEAM-4772 at 7/12/18 7:41 PM: ----------------------------------------------------------------- It looks to me like the issue is with CompressedSource. I don't think it correctly inherits the value of its delegate's emptyMatchTreatment property. To illustrate: {code:java} // some arbitrary delegate -- note that EmptyMatchTreatment.ALLOW is passed final FileBasedSource<String> delegate = new FileBasedSource<String>(StaticValueProvider.of("abc"), EmptyMatchTreatment.ALLOW, 0L) { protected FileBasedSource<String> createForSubrangeOfFile(MatchResult.Metadata fileMetadata, long start, long end) {return null;} protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {return null;} }; // construct CompressedSource from it final CompressedSource compressedSource = CompressedSource.from(delegate); // check what emptyMatchTreatment is set to System.out.println(delegate.getEmptyMatchTreatment()); // <- ALLOW System.out.println(compressedSource.getEmptyMatchTreatment()); // <- DISALLOW {code} Which seems like it might be incorrect to me. It seems like the fix might be to change the super() calls within CompressedSource to include the delegate's emptyMatchTreatment. For instance, {code:java} private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) { super(sourceDelegate.getFileOrPatternSpecProvider(), 9223372036854775807L); this.sourceDelegate = sourceDelegate; this.channelFactory = channelFactory; } {code} would become {code:java} private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) { super(sourceDelegate.getFileOrPatternSpecProvider(), sourceDelegate.getEmptyMatchTreatment(), 9223372036854775807L); this.sourceDelegate = sourceDelegate; this.channelFactory = channelFactory; } {code} I believe this would make my previous example "pass" (in other words, it would print ALLOW in both statements). I'm not sure if this fixes the problem or not – I didn't recompile Beam with this change and test it. Just hoping this might be helpful. was (Author: tanner.c): It looks to me like the issue is with CompressedSource. I don't think it correctly inherits the value of its delegate's emptyMatchTreatment property. To illustrate: {code:java} // some arbitrary delegate -- note that EmptyMatchTreatment.ALLOW is passed final FileBasedSource<String> delegate = new FileBasedSource<String>(StaticValueProvider.of("abc"), EmptyMatchTreatment.ALLOW, 0L) { protected FileBasedSource<String> createForSubrangeOfFile(MatchResult.Metadata fileMetadata, long start, long end) {return null;} protected FileBasedReader<String> createSingleFileReader(PipelineOptions options) {return null;} }; // construct CompressedSource from it final CompressedSource compressedSource = CompressedSource.from(delegate); // check what emptyMatchTreatment is set to System.out.println(delegate.getEmptyMatchTreatment()); // <- ALLOW System.out.println(compressedSource.getEmptyMatchTreatment()); // <- DISALLOW {code} Which seems like it's might be incorrect to me. It seems like the fix might be to change the super() calls within CompressedSource to include the delegate's emptyMatchTreatment. For instance, {code:java} private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) { super(sourceDelegate.getFileOrPatternSpecProvider(), 9223372036854775807L); this.sourceDelegate = sourceDelegate; this.channelFactory = channelFactory; } {code} would become {code:java} private CompressedSource(FileBasedSource<T> sourceDelegate, CompressedSource.DecompressingChannelFactory channelFactory) { super(sourceDelegate.getFileOrPatternSpecProvider(), sourceDelegate.getEmptyMatchTreatment(), 9223372036854775807L); this.sourceDelegate = sourceDelegate; this.channelFactory = channelFactory; } {code} I believe this would make my previous example "pass" (in other words, it would print ALLOW in both statements). I'm not sure if this fixes the problem or not – I didn't recompile Beam with this change and test it. Just hoping this might be helpful. > TextIO.read transform does not respect .withEmptyMatchTreatment > --------------------------------------------------------------- > > Key: BEAM-4772 > URL: https://issues.apache.org/jira/browse/BEAM-4772 > Project: Beam > Issue Type: Bug > Components: sdk-java-core > Affects Versions: 2.5.0 > Reporter: Samuel Waggoner > Assignee: Kenneth Knowles > Priority: Major > > I modified the MinimalWordCount example to reproduce. I expect the read > transform to read 0 lines rather than give an exception, since I used > EmptyMatchTreatment.ALLOW. I see the same behavior with ALLOW_IF_WILDCARD. > The EmptyMatchTreatment value seems to be ignored. > {code:java} > public class MinimalWordCount { > public static void main(String[] args) { > PipelineOptions options = PipelineOptionsFactory.create(); > Pipeline p = Pipeline.create(options); > p.apply(TextIO.read() > .from("gs://apache-beam-samples/doesnotexist/*") > .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)) > .apply(TextIO.write().to("wordcounts")); > p.run().waitUntilFinish(); > } > } > {code} > {code:java} > Exception in thread "main" > org.apache.beam.sdk.Pipeline$PipelineExecutionException: > java.io.FileNotFoundException: No files matched spec: > gs://apache-beam-samples/doesnotexist/* > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) > at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297) > at org.apache.beam.examples.MinimalWordCount.main(MinimalWordCount.java:124) > Caused by: java.io.FileNotFoundException: No files matched spec: > gs://apache-beam-samples/doesnotexist/* > at > org.apache.beam.sdk.io.FileSystems.maybeAdjustEmptyMatchResult(FileSystems.java:172) > at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:158) > at > org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:222) > at > org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:212) > at > org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:91) > at > org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81){code} > We see this behavior both when using DirectRunner and DataflowRunner -- This message was sent by Atlassian JIRA (v7.6.3#76005)