C0urante commented on code in PR #14003:
URL: https://github.com/apache/kafka/pull/14003#discussion_r1262733289


##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1608,6 +1612,32 @@ void modifySourceConnectorOffsets(String connName, 
Connector connector, Map<Stri
         }));
     }
 
+    /**
+     * "Normalize" source connector offsets by serializing and deserializing 
them using the internal {@link JsonConverter}.
+     * This is done in order to prevent type mismatches between the offsets 
passed to {@link SourceConnector#alterOffsets(Map, Map)}
+     * and the offsets that connectors and tasks retrieve via an instance of 
{@link OffsetStorageReader}.
+     * <p>
+     * Visible for testing.
+     *
+     * @param originalOffsets the offsets that are to be normalized
+     * @return the normalized offsets
+     */
+    @SuppressWarnings("unchecked")
+    Map<Map<String, ?>, Map<String, ?>> 
normalizeSourceConnectorOffsets(Map<Map<String, ?>, Map<String, ?>> 
originalOffsets) {
+        Map<Map<String, ?>, Map<String, ?>> normalizedOffsets = new 
HashMap<>();
+        for (Map.Entry<Map<String, ?>, Map<String, ?>> entry : 
originalOffsets.entrySet()) {
+            OffsetUtils.validateFormat(entry.getKey());
+            OffsetUtils.validateFormat(entry.getValue());
+            byte[] serializedKey = internalKeyConverter.fromConnectData("", 
null, entry.getKey());
+            byte[] serializedValue = internalKeyConverter.fromConnectData("", 
null, entry.getValue());

Review Comment:
   ```suggestion
               byte[] serializedValue = 
internalValueConverter.fromConnectData("", null, entry.getValue());
   ```



##########
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java:
##########
@@ -2015,6 +2019,33 @@ public void testAlterOffsetsSourceConnectorError() 
throws Exception {
         verifyKafkaClusterId();
     }
 
+    @Test
+    public void testNormalizeSourceConnectorOffsets() throws Exception {
+        Map<Map<String, ?>, Map<String, ?>> offsets = Collections.singletonMap(
+                Collections.singletonMap("filename", "/path/to/filename"),
+                Collections.singletonMap("position", 20)
+        );
+
+        assertTrue(offsets.values().iterator().next().get("position") 
instanceof Integer);
+
+        JsonConverter jsonConverter = new JsonConverter();
+        
jsonConverter.configure(Collections.singletonMap(SCHEMAS_ENABLE_CONFIG, false), 
false);
+        when(plugins.newInternalConverter(eq(true), anyString(), anyMap()))
+                .thenReturn(jsonConverter);
+        when(plugins.newInternalConverter(eq(false), anyString(), anyMap()))
+                .thenReturn(jsonConverter);

Review Comment:
   Can we use the utility method here?
   ```suggestion
           mockInternalConverters();
   ```



##########
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java:
##########
@@ -1608,6 +1612,32 @@ void modifySourceConnectorOffsets(String connName, 
Connector connector, Map<Stri
         }));
     }
 
+    /**
+     * "Normalize" source connector offsets by serializing and deserializing 
them using the internal {@link JsonConverter}.
+     * This is done in order to prevent type mismatches between the offsets 
passed to {@link SourceConnector#alterOffsets(Map, Map)}
+     * and the offsets that connectors and tasks retrieve via an instance of 
{@link OffsetStorageReader}.
+     * <p>
+     * Visible for testing.
+     *
+     * @param originalOffsets the offsets that are to be normalized
+     * @return the normalized offsets
+     */
+    @SuppressWarnings("unchecked")
+    Map<Map<String, ?>, Map<String, ?>> 
normalizeSourceConnectorOffsets(Map<Map<String, ?>, Map<String, ?>> 
originalOffsets) {
+        Map<Map<String, ?>, Map<String, ?>> normalizedOffsets = new 
HashMap<>();
+        for (Map.Entry<Map<String, ?>, Map<String, ?>> entry : 
originalOffsets.entrySet()) {
+            OffsetUtils.validateFormat(entry.getKey());
+            OffsetUtils.validateFormat(entry.getValue());
+            byte[] serializedKey = internalKeyConverter.fromConnectData("", 
null, entry.getKey());
+            byte[] serializedValue = internalKeyConverter.fromConnectData("", 
null, entry.getValue());
+            Object deserializedKey = internalKeyConverter.toConnectData("", 
serializedKey).value();
+            Object deserializedValue = internalKeyConverter.toConnectData("", 
serializedValue).value();

Review Comment:
   ```suggestion
               Object deserializedValue = 
internalValueConverter.toConnectData("", serializedValue).value();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to