tiwarinaman commented on issue #21608:
URL: https://github.com/apache/beam/issues/21608#issuecomment-1192286626

   I saw that in the new version (2.40.0) there is a new method called 
**skipIfEmpty()** which is available in the **writeCustomType()** 
implementation as you can see in the below code
   
   ```java
   errorRecords.apply("WritingErrorRecords", 
TextIO.<String>writeCustomType().to(options.getBucketPath())
                .withHeader("ID|ERROR_CODE|ERROR_MESSAGE")
                .skipIfEmpty()
                .withoutSharding()
                .withSuffix(".txt")
                .withShardNameTemplate("-SSS")
                .withNumShards(1));
     ```
   
   This code will work when the PCollection is empty which is good this is what 
we wanted but when PCollection is not empty then It will throw 
NullPointerException pasted below.
   
   ```java
   Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.NullPointerException: Cannot invoke 
"org.apache.beam.sdk.transforms.SerializableFunction.apply(Object)" because 
"this.formatFunction" is null
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
        at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
        at org.apache.beam.examples.WordCount.runWordCount(WordCount.java:97)
        at org.apache.beam.examples.WordCount.main(WordCount.java:113)
   Caused by: java.lang.NullPointerException: Cannot invoke 
"org.apache.beam.sdk.transforms.SerializableFunction.apply(Object)" because 
"this.formatFunction" is null
        at 
org.apache.beam.sdk.io.DynamicFileDestinations$ConstantFilenamePolicy.formatRecord(DynamicFileDestinations.java:51)
        at 
org.apache.beam.sdk.io.WriteFiles$WriteShardsIntoTempFilesFn.processElement(WriteFiles.java:953)
   ```
   
   I believe this is because when we build the object of 
**AutoValue_TextIO_TypedWrite** so in the setter of **DynamicDestinations** 
initially it's null and we never update the value. 
   I'm talking about the below apache beam code in TextIO file.
   
   ```java
   public static <UserT> TypedWrite<UserT, Void> writeCustomType() {
           return (new 
AutoValue_TextIO_TypedWrite.Builder()).setFilenamePrefix((ValueProvider)null).setTempDirectory((ValueProvider)null).setShardTemplate((String)null).setFilenameSuffix((String)null).setFilenamePolicy((FileBasedSink.FilenamePolicy)null).setDynamicDestinations((FileBasedSink.DynamicDestinations)null).setDelimiter(new
 
char[]{'\n'}).setWritableByteChannelFactory(org.apache.beam.sdk.io.FileBasedSink.CompressionType.UNCOMPRESSED).setWindowedWrites(false).setNoSpilling(false).setSkipIfEmpty(false).build();
       }
   ```
   
   So can somebody help me to solve this problem actually I'm working on a 
project and this issue is on **priority**.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to