This is an automated email from the ASF dual-hosted git repository.

yichi pushed a commit to branch release-2.37.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.37.0 by this push:
     new 76a3b37  [BEAM-12164]: Fixes SpannerChangeStreamIT (#16806) (#16831)
76a3b37 is described below

commit 76a3b3762f834e37d542c5eaf2bf2ec2f8db0eca
Author: Yichi Zhang <[email protected]>
AuthorDate: Fri Feb 11 12:48:57 2022 -0800

    [BEAM-12164]: Fixes SpannerChangeStreamIT (#16806) (#16831)
    
    The GSON object was being initialized in a static context. When the DoFn
    that used it was called in another JVM, which did not initialized this
    object, we were getting a NPE exception.
    
    Here, we move the initialization of the GSON object inside the DoFn
    itself (through a setup method).
    
    Co-authored-by: Thiago Nunes <[email protected]>
---
 .../changestreams/it/SpannerChangeStreamIT.java    | 65 +++++++++++++---------
 1 file changed, 39 insertions(+), 26 deletions(-)

diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
index 03787cd..361e889 100644
--- 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;
 
-import static org.apache.beam.sdk.values.TypeDescriptors.strings;
-
 import com.google.cloud.Timestamp;
 import com.google.cloud.spanner.DatabaseClient;
 import com.google.cloud.spanner.Key;
@@ -33,9 +31,11 @@ import 
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
 import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -56,7 +56,6 @@ public class SpannerChangeStreamIT {
   private static String databaseId;
   private static String tableName;
   private static String changeStreamName;
-  private static Gson gson;
   private static DatabaseClient databaseClient;
 
   @BeforeClass
@@ -67,7 +66,6 @@ public class SpannerChangeStreamIT {
     tableName = ENV.createSingersTable();
     changeStreamName = ENV.createChangeStreamFor(tableName);
     databaseClient = ENV.getDatabaseClient();
-    gson = new Gson();
   }
 
   @Before
@@ -106,7 +104,7 @@ public class SpannerChangeStreamIT {
                     .withMetadataDatabase(databaseId)
                     .withInclusiveStartAt(startAt)
                     .withInclusiveEndAt(endAt))
-            
.apply(MapElements.into(strings()).via(SpannerChangeStreamIT::modsToString));
+            .apply(ParDo.of(new ModsToString()));
 
     // Each row is composed by the following data
     // <mod type, singer id, old first name, old last name, new first name, 
new last name>
@@ -188,25 +186,40 @@ public class SpannerChangeStreamIT {
         Collections.singletonList(Mutation.delete(tableName, 
Key.of(singerId))));
   }
 
-  private static String modsToString(DataChangeRecord record) {
-    final Mod mod = record.getMods().get(0);
-    final Map<String, String> keys = gson.fromJson(mod.getKeysJson(), 
Map.class);
-    final Map<String, String> oldValues =
-        Optional.ofNullable(mod.getOldValuesJson())
-            .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
-            .orElseGet(Collections::emptyMap);
-    final Map<String, String> newValues =
-        Optional.ofNullable(mod.getNewValuesJson())
-            .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
-            .orElseGet(Collections::emptyMap);
-
-    return String.join(
-        ",",
-        record.getModType().toString(),
-        keys.get("SingerId"),
-        oldValues.get("FirstName"),
-        oldValues.get("LastName"),
-        newValues.get("FirstName"),
-        newValues.get("LastName"));
+  private static class ModsToString extends DoFn<DataChangeRecord, String> {
+    private transient Gson gson;
+
+    @Setup
+    public void setup() {
+      gson = new Gson();
+    }
+
+    @ProcessElement
+    public void processElement(
+        @Element DataChangeRecord record, OutputReceiver<String> 
outputReceiver) {
+      final Mod mod = record.getMods().get(0);
+      final Map<String, String> keys = gson.fromJson(mod.getKeysJson(), 
Map.class);
+      final Map<String, String> oldValues =
+          Optional.ofNullable(mod.getOldValuesJson())
+              .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
+              .orElseGet(Collections::emptyMap);
+      final Map<String, String> newValues =
+          Optional.ofNullable(mod.getNewValuesJson())
+              .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
+              .orElseGet(Collections::emptyMap);
+
+      final String modsAsString =
+          String.join(
+              ",",
+              record.getModType().toString(),
+              keys.get("SingerId"),
+              oldValues.get("FirstName"),
+              oldValues.get("LastName"),
+              newValues.get("FirstName"),
+              newValues.get("LastName"));
+      final Instant timestamp = new 
Instant(record.getRecordTimestamp().toSqlTimestamp());
+
+      outputReceiver.outputWithTimestamp(modsAsString, timestamp);
+    }
   }
 }

Reply via email to