This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 78c15648b51 Add Two Counter Metric in BigQuery Write Schema Transform 
(#25155)
78c15648b51 is described below

commit 78c15648b514f5e61b83c593715114516ea639fb
Author: nickuncaged1201 <[email protected]>
AuthorDate: Thu Feb 9 14:08:22 2023 -0800

    Add Two Counter Metric in BigQuery Write Schema Transform (#25155)
    
    * Added element counter and error counter for BQ write schema transform
    
    * Fixied styling issues with naming
    
    * Combined two trivial counter class for brevity. Used finishbundle 
annotation to reduce the number of calls to counter.inc() for better 
performance.
    
    * fix formatting
    
    ---------
    
    Co-authored-by: Nick Li <[email protected]>
---
 ...ueryStorageWriteApiSchemaTransformProvider.java |  42 ++++-
 ...StorageWriteApiSchemaTransformProviderTest.java | 169 ++++++++++++++++++---
 2 files changed, 189 insertions(+), 22 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
index 5f7851bba51..22e8abca35b 100644
--- 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProvider.java
@@ -37,6 +37,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
 import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
 import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
 import org.apache.beam.sdk.schemas.AutoValueSchema;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
@@ -45,8 +47,10 @@ import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
 import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
 import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollection.IsBounded;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
@@ -222,6 +226,30 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
       this.testBigQueryServices = testBigQueryServices;
     }
 
+    // A generic counter for PCollection of Row. Will be initialized with the 
given
+    // name argument. Performs element-wise counter of the input PCollection.
+    private static class ElementCounterFn extends DoFn<Row, Row> {
+      private Counter bqGenericElementCounter;
+      private Long elementsInBundle = 0L;
+
+      ElementCounterFn(String name) {
+        this.bqGenericElementCounter =
+            
Metrics.counter(BigQueryStorageWriteApiPCollectionRowTupleTransform.class, 
name);
+      }
+
+      @ProcessElement
+      public void process(ProcessContext c) {
+        this.elementsInBundle += 1;
+        c.output(c.element());
+      }
+
+      @FinishBundle
+      public void finish(FinishBundleContext c) {
+        this.bqGenericElementCounter.inc(this.elementsInBundle);
+        this.elementsInBundle = 0L;
+      }
+    }
+
     @Override
     public PCollectionRowTuple expand(PCollectionRowTuple input) {
       // Check that the input exists
@@ -241,7 +269,12 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
                         : Duration.standardSeconds(triggeringFrequency));
       }
 
-      WriteResult result = inputRows.apply(write);
+      Schema inputSchema = inputRows.getSchema();
+      WriteResult result =
+          inputRows
+              .apply("element-count", ParDo.of(new 
ElementCounterFn("element-counter")))
+              .setRowSchema(inputSchema)
+              .apply(write);
 
       Schema errorSchema =
           Schema.of(
@@ -263,7 +296,12 @@ public class BigQueryStorageWriteApiSchemaTransformProvider
                                   .build()))
               .setRowSchema(errorSchema);
 
-      return PCollectionRowTuple.of(OUTPUT_ERRORS_TAG, errorRows);
+      PCollection<Row> errorOutput =
+          errorRows
+              .apply("error-count", ParDo.of(new 
ElementCounterFn("error-counter")))
+              .setRowSchema(errorSchema);
+
+      return PCollectionRowTuple.of(OUTPUT_ERRORS_TAG, errorOutput);
     }
 
     BigQueryIO.Write<Row> createStorageWriteApiTransform() {
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
index c8e733c8458..c0a6df5f125 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/providers/BigQueryStorageWriteApiSchemaTransformProviderTest.java
@@ -21,24 +21,36 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThrows;
 
+import com.google.api.services.bigquery.model.TableRow;
+import java.io.Serializable;
 import java.time.LocalDateTime;
 import java.util.Arrays;
 import java.util.List;
+import java.util.function.Function;
+import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
 import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiPCollectionRowTupleTransform;
 import 
org.apache.beam.sdk.io.gcp.bigquery.providers.BigQueryStorageWriteApiSchemaTransformProvider.BigQueryStorageWriteApiSchemaTransformConfiguration;
 import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
 import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
 import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
+import org.apache.beam.sdk.metrics.MetricNameFilter;
+import org.apache.beam.sdk.metrics.MetricQueryResults;
+import org.apache.beam.sdk.metrics.MetricResult;
+import org.apache.beam.sdk.metrics.MetricResults;
+import org.apache.beam.sdk.metrics.MetricsFilter;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.Field;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.schemas.logicaltypes.SqlTypes;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionRowTuple;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TypeDescriptor;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
@@ -60,24 +72,6 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
           Field.of("number", FieldType.INT64),
           Field.of("dt", FieldType.logicalType(SqlTypes.DATETIME)));
 
-  private static final List<Row> ROWS =
-      Arrays.asList(
-          Row.withSchema(SCHEMA)
-              .withFieldValue("name", "a")
-              .withFieldValue("number", 1L)
-              .withFieldValue("dt", LocalDateTime.parse("2000-01-01T00:00:00"))
-              .build(),
-          Row.withSchema(SCHEMA)
-              .withFieldValue("name", "b")
-              .withFieldValue("number", 2L)
-              .withFieldValue("dt", LocalDateTime.parse("2000-01-02T00:00:00"))
-              .build(),
-          Row.withSchema(SCHEMA)
-              .withFieldValue("name", "c")
-              .withFieldValue("number", 3L)
-              .withFieldValue("dt", LocalDateTime.parse("2000-01-03T00:00:00"))
-              .build());
-
   @Rule public final transient TestPipeline p = TestPipeline.create();
 
   @Before
@@ -115,10 +109,28 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
         (BigQueryStorageWriteApiPCollectionRowTupleTransform)
             provider.from(config).buildTransform();
 
+    List<Row> testRows =
+        Arrays.asList(
+            Row.withSchema(SCHEMA)
+                .withFieldValue("name", "a")
+                .withFieldValue("number", 1L)
+                .withFieldValue("dt", 
LocalDateTime.parse("2000-01-01T00:00:00"))
+                .build(),
+            Row.withSchema(SCHEMA)
+                .withFieldValue("name", "b")
+                .withFieldValue("number", 2L)
+                .withFieldValue("dt", 
LocalDateTime.parse("2000-01-02T00:00:00"))
+                .build(),
+            Row.withSchema(SCHEMA)
+                .withFieldValue("name", "c")
+                .withFieldValue("number", 3L)
+                .withFieldValue("dt", 
LocalDateTime.parse("2000-01-03T00:00:00"))
+                .build());
+
     writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
     String tag = provider.inputCollectionNames().get(0);
 
-    PCollection<Row> rows = p.apply(Create.of(ROWS).withRowSchema(SCHEMA));
+    PCollection<Row> rows = p.apply(Create.of(testRows).withRowSchema(SCHEMA));
 
     PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
     PCollectionRowTuple result = input.apply(writeRowTupleTransform);
@@ -135,8 +147,125 @@ public class 
BigQueryStorageWriteApiSchemaTransformProviderTest {
     runWithConfig(config);
     p.run().waitUntilFinish();
 
+    
assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec)));
+    assertEquals(3, fakeDatasetService.getAllRows("project", "dataset", 
"simple_write").size());
+  }
+
+  @Test
+  public void testInputElementCount() throws Exception {
+    String tableSpec = "project:dataset.input_count";
+    BigQueryStorageWriteApiSchemaTransformConfiguration config =
+        
BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
+
+    runWithConfig(config);
+    PipelineResult result = p.run();
+
+    MetricResults metrics = result.metrics();
+    MetricQueryResults metricResults =
+        metrics.queryMetrics(
+            MetricsFilter.builder()
+                .addNameFilter(
+                    MetricNameFilter.named(
+                        
BigQueryStorageWriteApiPCollectionRowTupleTransform.class,
+                        "element-counter"))
+                .build());
+
+    Iterable<MetricResult<Long>> counters = metricResults.getCounters();
+    if (!counters.iterator().hasNext()) {
+      throw new RuntimeException("no counters available for the input element 
count");
+    }
+
+    Long expectedCount = 3L;
+    for (MetricResult<Long> count : counters) {
+      assertEquals(expectedCount, count.getAttempted());
+    }
+  }
+
+  public PCollectionRowTuple runWithError(
+      BigQueryStorageWriteApiSchemaTransformConfiguration config) {
+    BigQueryStorageWriteApiSchemaTransformProvider provider =
+        new BigQueryStorageWriteApiSchemaTransformProvider();
+
+    BigQueryStorageWriteApiPCollectionRowTupleTransform writeRowTupleTransform 
=
+        (BigQueryStorageWriteApiPCollectionRowTupleTransform)
+            provider.from(config).buildTransform();
+
+    Function<TableRow, Boolean> shouldFailRow =
+        (Function<TableRow, Boolean> & Serializable) tr -> 
tr.get("name").equals("a");
+    fakeDatasetService.setShouldFailRow(shouldFailRow);
+
+    TableRow row1 =
+        new TableRow()
+            .set("name", "a")
+            .set("number", 1L)
+            .set("dt", LocalDateTime.parse("2000-01-01T00:00:00"));
+    TableRow row2 =
+        new TableRow()
+            .set("name", "b")
+            .set("number", 2L)
+            .set("dt", LocalDateTime.parse("2000-01-02T00:00:00"));
+    TableRow row3 =
+        new TableRow()
+            .set("name", "c")
+            .set("number", 3L)
+            .set("dt", LocalDateTime.parse("2000-01-03T00:00:00"));
+
+    writeRowTupleTransform.setBigQueryServices(fakeBigQueryServices);
+    String tag = provider.inputCollectionNames().get(0);
+
+    PCollection<Row> rows =
+        p.apply(Create.of(row1, row2, row3))
+            .apply(
+                MapElements.into(TypeDescriptor.of(Row.class))
+                    .via((tableRow) -> BigQueryUtils.toBeamRow(SCHEMA, 
tableRow)))
+            .setRowSchema(SCHEMA);
+
+    PCollectionRowTuple input = PCollectionRowTuple.of(tag, rows);
+    PCollectionRowTuple result = input.apply(writeRowTupleTransform);
+
+    return result;
+  }
+
+  @Test
+  public void testSimpleWriteWithFailure() throws Exception {
+    String tableSpec = "project:dataset.simple_write_with_failure";
+    BigQueryStorageWriteApiSchemaTransformConfiguration config =
+        
BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
+
+    runWithError(config);
+    p.run().waitUntilFinish();
+
     
assertNotNull(fakeDatasetService.getTable(BigQueryHelpers.parseTableSpec(tableSpec)));
     assertEquals(
-        ROWS.size(), fakeDatasetService.getAllRows("project", "dataset", 
"simple_write").size());
+        2, fakeDatasetService.getAllRows("project", "dataset", 
"simple_write_with_failure").size());
+  }
+
+  @Test
+  public void testErrorCount() throws Exception {
+    String tableSpec = "project:dataset.error_count";
+    BigQueryStorageWriteApiSchemaTransformConfiguration config =
+        
BigQueryStorageWriteApiSchemaTransformConfiguration.builder().setTable(tableSpec).build();
+
+    runWithError(config);
+    PipelineResult result = p.run();
+
+    MetricResults metrics = result.metrics();
+    MetricQueryResults metricResults =
+        metrics.queryMetrics(
+            MetricsFilter.builder()
+                .addNameFilter(
+                    MetricNameFilter.named(
+                        
BigQueryStorageWriteApiPCollectionRowTupleTransform.class, "error-counter"))
+                .build());
+
+    Iterable<MetricResult<Long>> counters = metricResults.getCounters();
+    if (!counters.iterator().hasNext()) {
+      throw new RuntimeException("no counters available ");
+    }
+
+    Long expectedCount = 1L;
+    for (MetricResult<Long> count : counters) {
+      assertEquals(expectedCount, count.getAttempted());
+    }
   }
 }

Reply via email to