[
https://issues.apache.org/jira/browse/BEAM-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542126#comment-16542126
]
Tanner Cecchetti commented on BEAM-4772:
----------------------------------------
It looks to me like the issue is with CompressedSource. I don't think it
correctly inherits the value of its delegate's emptyMatchTreatment property.
To illustrate:
{code:java}
// some arbitrary delegate -- note that EmptyMatchTreatment.ALLOW is passed
final FileBasedSource<String> delegate = new
FileBasedSource<String>(StaticValueProvider.of("abc"),
EmptyMatchTreatment.ALLOW, 0L) {
protected FileBasedSource<String>
createForSubrangeOfFile(MatchResult.Metadata fileMetadata, long start, long
end) {return null;}
protected FileBasedReader<String> createSingleFileReader(PipelineOptions
options) {return null;}
};
// construct CompressedSource from it
final CompressedSource compressedSource = CompressedSource.from(delegate);
// check what emptyMatchTreatment is set to
System.out.println(delegate.getEmptyMatchTreatment()); // <- ALLOW
System.out.println(compressedSource.getEmptyMatchTreatment()); // <- DISALLOW
{code}
Which seems like it's might be incorrect to me.
It seems like the fix might be to change the super() calls within
CompressedSource to include the delegate's emptyMatchTreatment. For instance,
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate,
CompressedSource.DecompressingChannelFactory channelFactory) {
super(sourceDelegate.getFileOrPatternSpecProvider(), 9223372036854775807L);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
}
{code}
would become
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate,
CompressedSource.DecompressingChannelFactory channelFactory) {
super(sourceDelegate.getFileOrPatternSpecProvider(),
sourceDelegate.getEmptyMatchTreatment(), 9223372036854775807L);
this.sourceDelegate = sourceDelegate;
this.channelFactory = channelFactory;
}
{code}
I believe this would make my previous example "pass" (in other words, it would
print ALLOW in both statements).
I'm not sure if this fixes the problem or not – I didn't recompile Beam with
this change and test it. Just hoping this might be helpful.
> TextIO.read transform does not respect .withEmptyMatchTreatment
> ---------------------------------------------------------------
>
> Key: BEAM-4772
> URL: https://issues.apache.org/jira/browse/BEAM-4772
> Project: Beam
> Issue Type: Bug
> Components: sdk-java-core
> Affects Versions: 2.5.0
> Reporter: Samuel Waggoner
> Assignee: Kenneth Knowles
> Priority: Major
>
> I modified the MinimalWordCount example to reproduce. I expect the read
> transform to read 0 lines rather than give an exception, since I used
> EmptyMatchTreatment.ALLOW. I see the same behavior with ALLOW_IF_WILDCARD.
> The EmptyMatchTreatment value seems to be ignored.
> {code:java}
> public class MinimalWordCount {
> public static void main(String[] args) {
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline p = Pipeline.create(options);
> p.apply(TextIO.read()
> .from("gs://apache-beam-samples/doesnotexist/*")
> .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
> .apply(TextIO.write().to("wordcounts"));
> p.run().waitUntilFinish();
> }
> }
> {code}
> {code:java}
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.io.FileNotFoundException: No files matched spec:
> gs://apache-beam-samples/doesnotexist/*
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
> at org.apache.beam.examples.MinimalWordCount.main(MinimalWordCount.java:124)
> Caused by: java.io.FileNotFoundException: No files matched spec:
> gs://apache-beam-samples/doesnotexist/*
> at
> org.apache.beam.sdk.io.FileSystems.maybeAdjustEmptyMatchResult(FileSystems.java:172)
> at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:158)
> at
> org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:222)
> at
> org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:212)
> at
> org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:91)
> at
> org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81){code}
> We see this behavior both when using DirectRunner and DataflowRunner
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)