gemini-code-assist[bot] commented on code in PR #38265:
URL: https://github.com/apache/beam/pull/38265#discussion_r3138985616
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/payloads/JsonPayloadSerializerProvider.java:
##########
@@ -38,12 +40,42 @@ public String identifier() {
@Override
public PayloadSerializer getSerializer(Schema schema, Map<String, Object>
tableParams) {
- ObjectMapper deserializeMapper =
-
RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
- ObjectMapper serializeMapper =
- RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema));
- return PayloadSerializer.of(
- row -> RowJsonUtils.rowToJson(serializeMapper, row).getBytes(UTF_8),
- bytes -> RowJsonUtils.jsonToRow(deserializeMapper, new String(bytes,
UTF_8)));
+ return new JsonPayloadSerializer(schema);
+ }
+
+ private static class JsonPayloadSerializer implements PayloadSerializer {
+ private final Schema schema;
+ private transient @Nullable ObjectMapper deserializeMapper;
+ private transient @Nullable ObjectMapper serializeMapper;
Review Comment:

The `ObjectMapper` fields should be marked as `volatile` to ensure proper
visibility when using lazy initialization, especially if using double-checked
locking to avoid synchronization overhead.
```suggestion
private transient volatile @Nullable ObjectMapper deserializeMapper;
private transient volatile @Nullable ObjectMapper serializeMapper;
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/payloads/JsonPayloadSerializerProvider.java:
##########
@@ -38,12 +40,42 @@ public String identifier() {
@Override
public PayloadSerializer getSerializer(Schema schema, Map<String, Object>
tableParams) {
- ObjectMapper deserializeMapper =
-
RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
- ObjectMapper serializeMapper =
- RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema));
- return PayloadSerializer.of(
- row -> RowJsonUtils.rowToJson(serializeMapper, row).getBytes(UTF_8),
- bytes -> RowJsonUtils.jsonToRow(deserializeMapper, new String(bytes,
UTF_8)));
+ return new JsonPayloadSerializer(schema);
+ }
+
+ private static class JsonPayloadSerializer implements PayloadSerializer {
+ private final Schema schema;
+ private transient @Nullable ObjectMapper deserializeMapper;
+ private transient @Nullable ObjectMapper serializeMapper;
+
+ public JsonPayloadSerializer(Schema schema) {
+ this.schema = schema;
+ this.deserializeMapper = null;
+ this.serializeMapper = null;
+ }
Review Comment:

Explicitly initializing fields to `null` in the constructor is redundant in
Java, as they are already initialized to their default values. This can be
simplified.
```java
public JsonPayloadSerializer(Schema schema) {
this.schema = schema;
}
```
##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/payloads/JsonPayloadSerializerProvider.java:
##########
@@ -38,12 +40,42 @@ public String identifier() {
@Override
public PayloadSerializer getSerializer(Schema schema, Map<String, Object>
tableParams) {
- ObjectMapper deserializeMapper =
-
RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
- ObjectMapper serializeMapper =
- RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema));
- return PayloadSerializer.of(
- row -> RowJsonUtils.rowToJson(serializeMapper, row).getBytes(UTF_8),
- bytes -> RowJsonUtils.jsonToRow(deserializeMapper, new String(bytes,
UTF_8)));
+ return new JsonPayloadSerializer(schema);
+ }
+
+ private static class JsonPayloadSerializer implements PayloadSerializer {
+ private final Schema schema;
+ private transient @Nullable ObjectMapper deserializeMapper;
+ private transient @Nullable ObjectMapper serializeMapper;
+
+ public JsonPayloadSerializer(Schema schema) {
+ this.schema = schema;
+ this.deserializeMapper = null;
+ this.serializeMapper = null;
+ }
+
+ private synchronized ObjectMapper getDeserializeMapper() {
+ if (deserializeMapper == null) {
+ deserializeMapper =
RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
+ }
+ return deserializeMapper;
+ }
+
+ private synchronized ObjectMapper getSerializeMapper() {
+ if (serializeMapper == null) {
+ serializeMapper =
RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema));
+ }
+ return serializeMapper;
+ }
Review Comment:

Using `synchronized` on the entire method for every access to the
`ObjectMapper` can introduce contention in high-throughput scenarios, as these
methods are called for every record processed. Consider using double-checked
locking (with the fields marked as `volatile`) to optimize the common path
where the mappers are already initialized.
```java
private ObjectMapper getDeserializeMapper() {
ObjectMapper result = deserializeMapper;
if (result == null) {
synchronized (this) {
result = deserializeMapper;
if (result == null) {
deserializeMapper = result =
RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
}
}
}
return result;
}
private ObjectMapper getSerializeMapper() {
ObjectMapper result = serializeMapper;
if (result == null) {
synchronized (this) {
result = serializeMapper;
if (result == null) {
serializeMapper = result =
RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema));
}
}
}
return result;
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]