kunal703 opened a new issue, #30357:
URL: https://github.com/apache/beam/issues/30357

   ### What happened?
   
   Hello I have created a streaming pipeline which takes in input rows, Groups 
them by a bq_table and writes to avrod files which are finally loaded into BQ 
using the batch_load api. The job works fine up seeing consistent error when 
trying to do an update 
   to the job. The jobs is using dataflow runner v2 and apache beam version 
2.52.0. I see a similar issue like this here 
https://github.com/spotify/scio/issues/2488 which was closed after beam upgrade.
   
   ```
   Workflow failed. Causes: The new job is not compatible with 
2024-02-02_18_59_52-17530581663331815630. The original job (if it existed) has 
not been aborted., The Coder or type for step 
AutologBQ_data/FileIO.Write/WriteFiles/WriteShardedBundlesToTempFiles/GroupIntoShards
 has changed. 
   ```
   The GroupIntoShards above is a GroupByKey Step
   
   Here is the relevant code that is being used for the WriteFiles
   ```
        FileIO.Write<String, KV<String, Row>> write =  FileIO.<String, 
KV<String, Row>>writeDynamic()
           .by(new BqTableDestinationFn())
           .withDestinationCoder(StringUtf8Coder.of())
           .withNaming(new NamingStrategyAvroFn())
           .via(Contextful.fn(new ConvertToAvro(schemas)), Contextful.fn(new 
GetSink(schemas)))
           .withNumShards(1)
           .to(gcsUri);
   
         PCollection<KV<String, Row>> inputWithKeys = input.apply(
           "AddTrigger",
           Window.<Row>into(new GlobalWindows())
             .triggering(
               Repeatedly.forever(
                 AfterFirst.of(
                   AfterPane.elementCountAtLeast(maxBatchSize),
                   AfterProcessingTime.pastFirstElementInPane().plusDelayOf(
                     windowDuration
                   ))))
             .withAllowedLateness(Duration.ZERO)
             .discardingFiredPanes()
         )
         .apply(WithKeys.of(new GetDestination()))
         .setCoder(KvCoder.of(StringUtf8Coder.of(), 
RowCoder.of(input.getSchema())));
   
         WriteFilesResult<String> writeFilesResult = inputWithKeys.apply(write);
   ```
   
   I have checked the graph and seems like the graph is same for the updated 
job, so should not see this error.  Can I get some help with this? 
   
   ### Issue Priority
   
   Priority: 2 (default / most bugs should be filed as P2)
   
   ### Issue Components
   
   - [ ] Component: Python SDK
   - [X] Component: Java SDK
   - [ ] Component: Go SDK
   - [ ] Component: Typescript SDK
   - [ ] Component: IO connector
   - [ ] Component: Beam YAML
   - [ ] Component: Beam examples
   - [ ] Component: Beam playground
   - [ ] Component: Beam katas
   - [ ] Component: Website
   - [ ] Component: Spark Runner
   - [ ] Component: Flink Runner
   - [ ] Component: Samza Runner
   - [ ] Component: Twister2 Runner
   - [ ] Component: Hazelcast Jet Runner
   - [ ] Component: Google Cloud Dataflow Runner


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to