gemini-code-assist[bot] commented on code in PR #38265:
URL: https://github.com/apache/beam/pull/38265#discussion_r3138985616


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/payloads/JsonPayloadSerializerProvider.java:
##########
@@ -38,12 +40,42 @@ public String identifier() {
 
   @Override
   public PayloadSerializer getSerializer(Schema schema, Map<String, Object> 
tableParams) {
-    ObjectMapper deserializeMapper =
-        
RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
-    ObjectMapper serializeMapper =
-        RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema));
-    return PayloadSerializer.of(
-        row -> RowJsonUtils.rowToJson(serializeMapper, row).getBytes(UTF_8),
-        bytes -> RowJsonUtils.jsonToRow(deserializeMapper, new String(bytes, 
UTF_8)));
+    return new JsonPayloadSerializer(schema);
+  }
+
+  private static class JsonPayloadSerializer implements PayloadSerializer {
+    private final Schema schema;
+    private transient @Nullable ObjectMapper deserializeMapper;
+    private transient @Nullable ObjectMapper serializeMapper;

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `ObjectMapper` fields should be marked as `volatile` to ensure proper 
visibility when using lazy initialization, especially if using double-checked 
locking to avoid synchronization overhead.
   
   ```suggestion
       private transient volatile @Nullable ObjectMapper deserializeMapper;
       private transient volatile @Nullable ObjectMapper serializeMapper;
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/payloads/JsonPayloadSerializerProvider.java:
##########
@@ -38,12 +40,42 @@ public String identifier() {
 
   @Override
   public PayloadSerializer getSerializer(Schema schema, Map<String, Object> 
tableParams) {
-    ObjectMapper deserializeMapper =
-        
RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
-    ObjectMapper serializeMapper =
-        RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema));
-    return PayloadSerializer.of(
-        row -> RowJsonUtils.rowToJson(serializeMapper, row).getBytes(UTF_8),
-        bytes -> RowJsonUtils.jsonToRow(deserializeMapper, new String(bytes, 
UTF_8)));
+    return new JsonPayloadSerializer(schema);
+  }
+
+  private static class JsonPayloadSerializer implements PayloadSerializer {
+    private final Schema schema;
+    private transient @Nullable ObjectMapper deserializeMapper;
+    private transient @Nullable ObjectMapper serializeMapper;
+
+    public JsonPayloadSerializer(Schema schema) {
+      this.schema = schema;
+      this.deserializeMapper = null;
+      this.serializeMapper = null;
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Explicitly initializing fields to `null` in the constructor is redundant in 
Java, as they are already initialized to their default values. This can be 
simplified.
   
   ```java
       public JsonPayloadSerializer(Schema schema) {
         this.schema = schema;
       }
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/payloads/JsonPayloadSerializerProvider.java:
##########
@@ -38,12 +40,42 @@ public String identifier() {
 
   @Override
   public PayloadSerializer getSerializer(Schema schema, Map<String, Object> 
tableParams) {
-    ObjectMapper deserializeMapper =
-        
RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
-    ObjectMapper serializeMapper =
-        RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema));
-    return PayloadSerializer.of(
-        row -> RowJsonUtils.rowToJson(serializeMapper, row).getBytes(UTF_8),
-        bytes -> RowJsonUtils.jsonToRow(deserializeMapper, new String(bytes, 
UTF_8)));
+    return new JsonPayloadSerializer(schema);
+  }
+
+  private static class JsonPayloadSerializer implements PayloadSerializer {
+    private final Schema schema;
+    private transient @Nullable ObjectMapper deserializeMapper;
+    private transient @Nullable ObjectMapper serializeMapper;
+
+    public JsonPayloadSerializer(Schema schema) {
+      this.schema = schema;
+      this.deserializeMapper = null;
+      this.serializeMapper = null;
+    }
+
+    private synchronized ObjectMapper getDeserializeMapper() {
+      if (deserializeMapper == null) {
+        deserializeMapper = 
RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
+      }
+      return deserializeMapper;
+    }
+
+    private synchronized ObjectMapper getSerializeMapper() {
+      if (serializeMapper == null) {
+        serializeMapper = 
RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema));
+      }
+      return serializeMapper;
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `synchronized` on the entire method for every access to the 
`ObjectMapper` can introduce contention in high-throughput scenarios, as these 
methods are called for every record processed. Consider using double-checked 
locking (with the fields marked as `volatile`) to optimize the common path 
where the mappers are already initialized.
   
   ```java
       private ObjectMapper getDeserializeMapper() {
         ObjectMapper result = deserializeMapper;
         if (result == null) {
           synchronized (this) {
             result = deserializeMapper;
             if (result == null) {
               deserializeMapper = result = 
RowJsonUtils.newObjectMapperWith(RowJsonDeserializer.forSchema(schema));
             }
           }
         }
         return result;
       }
   
       private ObjectMapper getSerializeMapper() {
         ObjectMapper result = serializeMapper;
         if (result == null) {
           synchronized (this) {
             result = serializeMapper;
             if (result == null) {
               serializeMapper = result = 
RowJsonUtils.newObjectMapperWith(RowJsonSerializer.forSchema(schema));
             }
           }
         }
         return result;
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to