WojciechM88 opened a new issue, #22314:
URL: https://github.com/apache/beam/issues/22314

   Hi, 
   I used Apache Beam in version 2.40.0
   I Write pipeline:
   `            Pipeline pipeline = Pipeline.create(options);
                
                PCollection<PubsubMessage> messages = pipeline .apply("Read 
PubSub Messages", 
PubsubIO.readMessagesWithAttributesAndMessageId().fromSubscription("subscription"));
   
                WriteResult out = messages.apply("WriteSuccessfulRecourds",
                                BigQueryIO.<PubsubMessage>write()
                                        .withoutValidation()
                                        .withFormatFunction((PubsubMessage 
elem) -> {
                                                System.out.println("Start 
transform: " + elem);
                                                TableRow tableRow = new 
TableRow().set("ID", "1").set("NAME", "TEST").set("DATE", new 
String(elem.getPayload()));//"2022-01-01T23:15");
                                                tableRow.set("messageCustom", 
elem);
                                                return tableRow;
                                        })
                                        
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
                                        
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
                                        .withExtendedErrorInfo()
                                        
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
                                        
//.withTriggeringFrequency(Duration.millis(500))
                                        .withNumStorageWriteApiStreams(20)
                                        
.withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                                        .ignoreUnknownValues()
                                        //.withExtendedErrorInfo()
                                        .to(String.format("%s:%s.%s", project", 
"TEST_BIG_QUERY", "TEST_BIG_QUERY_TABLE")));
   
                out.getFailedStorageApiInserts().apply("Error", new 
PubsubMessageToTableRow(options));
   
                return pipeline.run();`
   
   And after 60 minutes i have this error:
   
   `Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.IllegalArgumentException: Cannot output with timestamp 
294247-01-10T04:00:54.776Z. Output timestamps must be no earlier than the 
timestamp of the current input or timer (294247-01-10T04:00:54.776Z) minus the 
allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See 
the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed 
skew.
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:373)
        at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:341)
        at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:218)
        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
        at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
        at 
pl.woxtech.dataflow.pubsub.to.bigquery.GCPPubSubToBigQuery.run(GCPPubSubToBigQuery.java:132)
        at 
pl.woxtech.dataflow.pubsub.to.bigquery.GCPPubSubToBigQuery.main(GCPPubSubToBigQuery.java:73)
   Caused by: java.lang.IllegalArgumentException: Cannot output with timestamp 
294247-01-10T04:00:54.776Z. Output timestamps must be no earlier than the 
timestamp of the current input or timer (294247-01-10T04:00:54.776Z) minus the 
allowed skew (0 milliseconds) and no later than 294247-01-10T04:00:54.775Z. See 
the DoFn#getAllowedTimestampSkew() Javadoc for details on changing the allowed 
skew.
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.checkTimestamp(SimpleDoFnRunner.java:259)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$1300(SimpleDoFnRunner.java:85)
        at 
org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$OnTimerArgumentProvider.output(SimpleDoFnRunner.java:843)
        at 
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
        at 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.finalizeStream(StorageApiWritesShardedRecords.java:536)
        at 
org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn.onTimer(StorageApiWritesShardedRecords.java:550)`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to