This is an automated email from the ASF dual-hosted git repository.
yichi pushed a commit to branch release-2.37.0
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.37.0 by this push:
new 76a3b37 [BEAM-12164]: Fixes SpannerChangeStreamIT (#16806) (#16831)
76a3b37 is described below
commit 76a3b3762f834e37d542c5eaf2bf2ec2f8db0eca
Author: Yichi Zhang <[email protected]>
AuthorDate: Fri Feb 11 12:48:57 2022 -0800
[BEAM-12164]: Fixes SpannerChangeStreamIT (#16806) (#16831)
The GSON object was being initialized in a static context. When the DoFn
that used it was called in another JVM, which did not initialized this
object, we were getting a NPE exception.
Here, we move the initialization of the GSON object inside the DoFn
itself (through a setup method).
Co-authored-by: Thiago Nunes <[email protected]>
---
.../changestreams/it/SpannerChangeStreamIT.java | 65 +++++++++++++---------
1 file changed, 39 insertions(+), 26 deletions(-)
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
index 03787cd..361e889 100644
---
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/it/SpannerChangeStreamIT.java
@@ -17,8 +17,6 @@
*/
package org.apache.beam.sdk.io.gcp.spanner.changestreams.it;
-import static org.apache.beam.sdk.values.TypeDescriptors.strings;
-
import com.google.cloud.Timestamp;
import com.google.cloud.spanner.DatabaseClient;
import com.google.cloud.spanner.Key;
@@ -33,9 +31,11 @@ import
org.apache.beam.sdk.io.gcp.spanner.changestreams.model.DataChangeRecord;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.model.Mod;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.lang3.tuple.Pair;
+import org.joda.time.Instant;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -56,7 +56,6 @@ public class SpannerChangeStreamIT {
private static String databaseId;
private static String tableName;
private static String changeStreamName;
- private static Gson gson;
private static DatabaseClient databaseClient;
@BeforeClass
@@ -67,7 +66,6 @@ public class SpannerChangeStreamIT {
tableName = ENV.createSingersTable();
changeStreamName = ENV.createChangeStreamFor(tableName);
databaseClient = ENV.getDatabaseClient();
- gson = new Gson();
}
@Before
@@ -106,7 +104,7 @@ public class SpannerChangeStreamIT {
.withMetadataDatabase(databaseId)
.withInclusiveStartAt(startAt)
.withInclusiveEndAt(endAt))
-
.apply(MapElements.into(strings()).via(SpannerChangeStreamIT::modsToString));
+ .apply(ParDo.of(new ModsToString()));
// Each row is composed by the following data
// <mod type, singer id, old first name, old last name, new first name,
new last name>
@@ -188,25 +186,40 @@ public class SpannerChangeStreamIT {
Collections.singletonList(Mutation.delete(tableName,
Key.of(singerId))));
}
- private static String modsToString(DataChangeRecord record) {
- final Mod mod = record.getMods().get(0);
- final Map<String, String> keys = gson.fromJson(mod.getKeysJson(),
Map.class);
- final Map<String, String> oldValues =
- Optional.ofNullable(mod.getOldValuesJson())
- .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
- .orElseGet(Collections::emptyMap);
- final Map<String, String> newValues =
- Optional.ofNullable(mod.getNewValuesJson())
- .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
- .orElseGet(Collections::emptyMap);
-
- return String.join(
- ",",
- record.getModType().toString(),
- keys.get("SingerId"),
- oldValues.get("FirstName"),
- oldValues.get("LastName"),
- newValues.get("FirstName"),
- newValues.get("LastName"));
+ private static class ModsToString extends DoFn<DataChangeRecord, String> {
+ private transient Gson gson;
+
+ @Setup
+ public void setup() {
+ gson = new Gson();
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element DataChangeRecord record, OutputReceiver<String>
outputReceiver) {
+ final Mod mod = record.getMods().get(0);
+ final Map<String, String> keys = gson.fromJson(mod.getKeysJson(),
Map.class);
+ final Map<String, String> oldValues =
+ Optional.ofNullable(mod.getOldValuesJson())
+ .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
+ .orElseGet(Collections::emptyMap);
+ final Map<String, String> newValues =
+ Optional.ofNullable(mod.getNewValuesJson())
+ .map(nonNullValues -> gson.fromJson(nonNullValues, Map.class))
+ .orElseGet(Collections::emptyMap);
+
+ final String modsAsString =
+ String.join(
+ ",",
+ record.getModType().toString(),
+ keys.get("SingerId"),
+ oldValues.get("FirstName"),
+ oldValues.get("LastName"),
+ newValues.get("FirstName"),
+ newValues.get("LastName"));
+ final Instant timestamp = new
Instant(record.getRecordTimestamp().toSqlTimestamp());
+
+ outputReceiver.outputWithTimestamp(modsAsString, timestamp);
+ }
}
}