[ 
https://issues.apache.org/jira/browse/BEAM-4772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16542126#comment-16542126
 ] 

Tanner Cecchetti edited comment on BEAM-4772 at 7/12/18 7:47 PM:
-----------------------------------------------------------------

It looks to me like the issue is with CompressedSource. I don't think it 
correctly inherits the value of its delegate's emptyMatchTreatment property.

To illustrate:
{code:java}
// some arbitrary delegate -- note that EmptyMatchTreatment.ALLOW is passed
final FileBasedSource<String> delegate = new 
FileBasedSource<String>(StaticValueProvider.of("abc"), 
EmptyMatchTreatment.ALLOW, 0L) {
    protected FileBasedSource<String> 
createForSubrangeOfFile(MatchResult.Metadata fileMetadata, long start, long 
end) {return null;}
    protected FileBasedReader<String> createSingleFileReader(PipelineOptions 
options) {return null;}
};

// construct CompressedSource from it
final CompressedSource compressedSource = CompressedSource.from(delegate);

// check what emptyMatchTreatment is set to
System.out.println(delegate.getEmptyMatchTreatment()); // <- ALLOW
System.out.println(compressedSource.getEmptyMatchTreatment()); // <- DISALLOW
{code}
Which seems like it might be incorrect to me.

 

It seems like the fix might be to change the super() calls within 
CompressedSource to include the delegate's emptyMatchTreatment. For instance,
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, 
CompressedSource.DecompressingChannelFactory channelFactory) {
    super(sourceDelegate.getFileOrPatternSpecProvider(), 9223372036854775807L);
    this.sourceDelegate = sourceDelegate;
    this.channelFactory = channelFactory;
}
{code}
would become
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, 
CompressedSource.DecompressingChannelFactory channelFactory) {
    super(sourceDelegate.getFileOrPatternSpecProvider(), 
sourceDelegate.getEmptyMatchTreatment(), 9223372036854775807L);
    this.sourceDelegate = sourceDelegate;
    this.channelFactory = channelFactory;
}
{code}
I believe this would make my previous example "pass" (in other words, it would 
print ALLOW in both statements).

 

I'm not sure if this fixes the problem or not – I didn't recompile Beam with 
this change and test it. Just hoping this might be helpful. 

 

Found from: TextIO.Read#expand, which calls TextIO.Read#getSource, which 
constructs a CompressedSource


was (Author: tanner.c):
It looks to me like the issue is with CompressedSource. I don't think it 
correctly inherits the value of its delegate's emptyMatchTreatment property.

To illustrate:
{code:java}
// some arbitrary delegate -- note that EmptyMatchTreatment.ALLOW is passed
final FileBasedSource<String> delegate = new 
FileBasedSource<String>(StaticValueProvider.of("abc"), 
EmptyMatchTreatment.ALLOW, 0L) {
    protected FileBasedSource<String> 
createForSubrangeOfFile(MatchResult.Metadata fileMetadata, long start, long 
end) {return null;}
    protected FileBasedReader<String> createSingleFileReader(PipelineOptions 
options) {return null;}
};

// construct CompressedSource from it
final CompressedSource compressedSource = CompressedSource.from(delegate);

// check what emptyMatchTreatment is set to
System.out.println(delegate.getEmptyMatchTreatment()); // <- ALLOW
System.out.println(compressedSource.getEmptyMatchTreatment()); // <- DISALLOW
{code}
Which seems like it might be incorrect to me.

 

It seems like the fix might be to change the super() calls within 
CompressedSource to include the delegate's emptyMatchTreatment. For instance,
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, 
CompressedSource.DecompressingChannelFactory channelFactory) {
    super(sourceDelegate.getFileOrPatternSpecProvider(), 9223372036854775807L);
    this.sourceDelegate = sourceDelegate;
    this.channelFactory = channelFactory;
}
{code}
would become
{code:java}
private CompressedSource(FileBasedSource<T> sourceDelegate, 
CompressedSource.DecompressingChannelFactory channelFactory) {
    super(sourceDelegate.getFileOrPatternSpecProvider(), 
sourceDelegate.getEmptyMatchTreatment(), 9223372036854775807L);
    this.sourceDelegate = sourceDelegate;
    this.channelFactory = channelFactory;
}
{code}
I believe this would make my previous example "pass" (in other words, it would 
print ALLOW in both statements).

 

I'm not sure if this fixes the problem or not – I didn't recompile Beam with 
this change and test it. Just hoping this might be helpful. 

 

(Found from: TextIO.Read#expand, which calls TextIO.Read#getSource, which 
constructs a CompressedSource)

> TextIO.read transform does not respect .withEmptyMatchTreatment
> ---------------------------------------------------------------
>
>                 Key: BEAM-4772
>                 URL: https://issues.apache.org/jira/browse/BEAM-4772
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-core
>    Affects Versions: 2.5.0
>            Reporter: Samuel Waggoner
>            Assignee: Kenneth Knowles
>            Priority: Major
>
> I modified the MinimalWordCount example to reproduce. I expect the read 
> transform to read 0 lines rather than give an exception, since I used 
> EmptyMatchTreatment.ALLOW. I see the same behavior with ALLOW_IF_WILDCARD. 
> The EmptyMatchTreatment value seems to be ignored.
> {code:java}
> public class MinimalWordCount {
>  public static void main(String[] args) {
>    PipelineOptions options = PipelineOptionsFactory.create();
>    Pipeline p = Pipeline.create(options);
>    p.apply(TextIO.read()
>      .from("gs://apache-beam-samples/doesnotexist/*")
>      .withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))
>     .apply(TextIO.write().to("wordcounts"));
>    p.run().waitUntilFinish();
>  }
> }
> {code}
> {code:java}
> Exception in thread "main" 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.io.FileNotFoundException: No files matched spec: 
> gs://apache-beam-samples/doesnotexist/*
>  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>  at org.apache.beam.sdk.Pipeline.run(Pipeline.java:297)
>  at org.apache.beam.examples.MinimalWordCount.main(MinimalWordCount.java:124)
> Caused by: java.io.FileNotFoundException: No files matched spec: 
> gs://apache-beam-samples/doesnotexist/*
>  at 
> org.apache.beam.sdk.io.FileSystems.maybeAdjustEmptyMatchResult(FileSystems.java:172)
>  at org.apache.beam.sdk.io.FileSystems.match(FileSystems.java:158)
>  at 
> org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:222)
>  at 
> org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:212)
>  at 
> org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:91)
>  at 
> org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:81){code}
> We see this behavior both when using DirectRunner and DataflowRunner 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to