[ 
https://issues.apache.org/jira/browse/BEAM-9770?focusedWorklogId=424270&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-424270
 ]

ASF GitHub Bot logged work on BEAM-9770:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 17/Apr/20 16:09
            Start Date: 17/Apr/20 16:09
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on pull request #11437: 
[BEAM-9770] Add BigQueryIO deadletter pattern
URL: https://github.com/apache/beam/pull/11437#discussion_r410320527
 
 

 ##########
 File path: 
examples/java/src/main/java/org/apache/beam/examples/snippets/Snippets.java
 ##########
 @@ -753,35 +750,93 @@ public static void main(String[] args) {
       // [START CustomSessionWindow5]
 
       PCollection<TableRow> p =
-          Pipeline.create()
-              .apply(
-                  "Create data",
-                  Create.timestamped(
-                      TimestampedValue.of(
-                          new TableRow().set("user", "mobile").set("score", 
12).set("gap", 5),
-                          new Instant()),
-                      TimestampedValue.of(
-                          new TableRow().set("user", "desktop").set("score", 
4), new Instant()),
-                      TimestampedValue.of(
-                          new TableRow().set("user", "mobile").set("score", 
-3).set("gap", 5),
-                          new Instant().plus(2000)),
-                      TimestampedValue.of(
-                          new TableRow().set("user", "mobile").set("score", 
2).set("gap", 5),
-                          new Instant().plus(9000)),
-                      TimestampedValue.of(
-                          new TableRow().set("user", "mobile").set("score", 
7).set("gap", 5),
-                          new Instant().plus(12000)),
-                      TimestampedValue.of(
-                          new TableRow().set("user", "desktop").set("score", 
10),
-                          new Instant().plus(12000))));
+              Pipeline.create()
+                      .apply(
+                              "Create data",
+                              Create.timestamped(
+                                      TimestampedValue.of(
+                                              new TableRow().set("user", 
"mobile").set("score", 12).set("gap", 5),
+                                              new Instant()),
+                                      TimestampedValue.of(
+                                              new TableRow().set("user", 
"desktop").set("score", 4), new Instant()),
+                                      TimestampedValue.of(
+                                              new TableRow().set("user", 
"mobile").set("score", -3).set("gap", 5),
+                                              new Instant().plus(2000)),
+                                      TimestampedValue.of(
+                                              new TableRow().set("user", 
"mobile").set("score", 2).set("gap", 5),
+                                              new Instant().plus(9000)),
+                                      TimestampedValue.of(
+                                              new TableRow().set("user", 
"mobile").set("score", 7).set("gap", 5),
+                                              new Instant().plus(12000)),
+                                      TimestampedValue.of(
+                                              new TableRow().set("user", 
"desktop").set("score", 10),
+                                              new Instant().plus(12000))));
       // [END CustomSessionWindow5]
 
       // [START CustomSessionWindow6]
       p.apply(
-          "Window into sessions",
-          Window.<TableRow>into(
-              
DynamicSessions.withDefaultGapDuration(Duration.standardSeconds(10))));
+              "Window into sessions",
+              Window.<TableRow>into(
+                      
DynamicSessions.withDefaultGapDuration(Duration.standardSeconds(10))));
       // [END CustomSessionWindow6]
+    }
+
+    public static class DeadLetterBigQuery {
+      public static void deadletter(String[] args) {
+        // [START BigQueryIODeadLetter]
+        // Create pipeline
+        PipelineOptions options =
+                
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryOptions.class);
+
+        Pipeline p = Pipeline.create(options);
+
+        // Create a bug by writing the 2nd value as null. The API will 
correctly
+        // throw an error when trying to insert a null value into a REQUIRED 
field.
+        WriteResult result =
+                p.apply(Create.of(1, 2))
+                        .apply(
+                                BigQueryIO.<Integer>write()
+                                        .withSchema(
+                                                new TableSchema()
+                                                        .setFields(
+                                                                
com.google.common.collect.ImmutableList.of(
+                                                                        new 
TableFieldSchema()
+                                                                               
 .setName("num")
+                                                                               
 .setType("INTEGER")
+                                                                               
 .setMode("REQUIRED"))))
+                                        .to("Test.dummyTable")
+                                        .withFormatFunction(x -> new 
TableRow().set("num", (x == 2) ? null : x))
+                                        
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())
 
 Review comment:
   Probably change to "retryTransientErrors" ? I'm afraid many users will just 
copy and use this pipeline as a template and will keep this entry in their 
production pipelines.
 
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 424270)
    Time Spent: 20m  (was: 10m)

> Add BigQuery DeadLetter pattern to Patterns Page
> ------------------------------------------------
>
>                 Key: BEAM-9770
>                 URL: https://issues.apache.org/jira/browse/BEAM-9770
>             Project: Beam
>          Issue Type: New Feature
>          Components: website
>            Reporter: Reza ardeshir rokni
>            Priority: Trivial
>          Time Spent: 20m
>  Remaining Estimate: 0h
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to