C0urante commented on code in PR #14005:
URL: https://github.com/apache/kafka/pull/14005#discussion_r1269901272


##########
connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java:
##########
@@ -597,7 +596,9 @@ private Set<TopicPartition> listPartitions(
             Admin admin,
             Collection<String> topics
     ) throws TimeoutException, InterruptedException, ExecutionException {
-        assertFalse("collection of topics may not be empty", topics.isEmpty());

Review Comment:
   Did the same in `assertConnectorAndExactlyNumTasksAreRunning`.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##########
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition 
topicPartition) {
 
     static Map<String, Object> wrapPartition(TopicPartition topicPartition, 
String sourceClusterAlias) {
         Map<String, Object> wrapped = new HashMap<>();
-        wrapped.put("topic", topicPartition.topic());
-        wrapped.put("partition", topicPartition.partition());
-        wrapped.put("cluster", sourceClusterAlias);
+        wrapped.put(TOPIC_KEY, topicPartition.topic());
+        wrapped.put(PARTITION_KEY, topicPartition.partition());
+        wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
         return wrapped;
     }
 
-    static Map<String, Object> wrapOffset(long offset) {
-        return Collections.singletonMap("offset", offset);
+    public static Map<String, Object> wrapOffset(long offset) {
+        return Collections.singletonMap(OFFSET_KEY, offset);
     }
 
-    static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
-        String topic = (String) wrapped.get("topic");
-        int partition = (Integer) wrapped.get("partition");
+    public static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
+        String topic = (String) wrapped.get(TOPIC_KEY);
+        int partition = (Integer) wrapped.get(PARTITION_KEY);
         return new TopicPartition(topic, partition);
     }
 
     static Long unwrapOffset(Map<String, ?> wrapped) {
-        if (wrapped == null || wrapped.get("offset") == null) {
+        if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
             return -1L;
         }
-        return (Long) wrapped.get("offset");
+        return (Long) wrapped.get(OFFSET_KEY);
+    }
+
+
+    /**
+     * Validate a specific key in a source partition that may be written to 
the offsets topic for one of the MM2 connectors.
+     * This method ensures that the key is present in the source partition map 
and that its value is a string.
+     *
+     * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+     * @see SourceRecord#sourcePartition()
+     *
+     * @param sourcePartition the to-be-validated source partition; may not be 
null
+     * @param key the key to check for in the source partition; may be null
+     *
+     * @throws ConnectException if the offset is invalid
+     */
+    static void validateSourcePartitionString(Map<String, ?> sourcePartition, 
String key) {
+        Objects.requireNonNull(sourcePartition, "Source partition may not be 
null");
+
+        if (!sourcePartition.containsKey(key))
+            throw new ConnectException(String.format(
+                    "Source partition %s is missing the '%s' key, which is 
required",
+                    sourcePartition,
+                    key
+            ));
+
+        Object value = sourcePartition.get(key);
+        if (!(value instanceof String)) {
+            throw new ConnectException(String.format(
+                    "Source partition %s has an invalid value %s for the '%s' 
key, which must be a string",
+                    sourcePartition,
+                    value,
+                    key
+            ));
+        }
+    }
+
+    /**
+     * Validate the {@link #PARTITION_KEY partition key} in a source partition 
that may be written to the offsets topic
+     * for one of the MM2 connectors.
+     * This method ensures that the key is present in the source partition map 
and that its value is a non-negative integer.
+     * <p/>
+     * Note that the partition key most likely refers to a partition in a 
Kafka topic, whereas the term "source partition" refers
+     * to a {@link SourceRecord#sourcePartition() source partition} that is 
stored in a Kafka Connect worker's internal offsets
+     * topic (or, if running in standalone mode, offsets file).
+     *
+     * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+     * @see SourceRecord#sourcePartition()
+     *
+     * @param sourcePartition the to-be-validated source partition; may not be 
null
+     *
+     * @throws ConnectException if the offset is invalid
+     */
+    static void validateSourcePartitionPartition(Map<String, ?> 
sourcePartition) {
+        Objects.requireNonNull(sourcePartition, "Source partition may not be 
null");
+
+        if (!sourcePartition.containsKey(PARTITION_KEY))
+            throw new ConnectException(String.format(
+                    "Source partition %s is missing the '%s' key, which is 
required",
+                    sourcePartition,
+                    PARTITION_KEY
+            ));
+
+        Object value = sourcePartition.get(PARTITION_KEY);
+        // The value may be encoded as a long but as long as it fits inside a 
32-bit integer, that's fine
+        if (!(value instanceof Integer || value instanceof Long) || ((Number) 
value).longValue() > Integer.MAX_VALUE) {
+            throw new ConnectException(String.format(
+                    "Source partition %s has an invalid value %s for the '%s' 
key, which must be an integer",
+                    sourcePartition,
+                    value,
+                    PARTITION_KEY
+            ));
+        }
+
+        if (((Number) value).intValue() < 0) {
+            throw new ConnectException(String.format(
+                    "Source partition %s has an invalid value %s for the '%s' 
key, which cannot be negative",
+                    sourcePartition,
+                    value,
+                    PARTITION_KEY
+            ));
+        }
+    }
+
+    /**
+     * Validate a source offset that may be written to the offsets topic for 
one of the MM2 connectors.
+     *
+     * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+     * @see SourceRecord#sourceOffset()
+     *
+     * @param sourcePartition the corresponding {@link 
SourceRecord#sourcePartition() source partition} for the offset;
+     *                        may not be null
+     * @param sourceOffset the to-be-validated source offset; may be null 
(which is considered valid)
+     * @param permitPositiveValues whether positive values for the "offset" 
value in the source offset map
+     *                             should be permitted; if {@code true}, then 
all non-negative values are permitted; if
+     *                             {@code false}, only the value zero is 
permitted
+     *
+     * @throws ConnectException if the offset is invalid
+     */
+    static void validateSourceOffset(Map<String, ?> sourcePartition, 
Map<String, ?> sourceOffset, boolean permitPositiveValues) {

Review Comment:
   Maybe `onlyOffsetZero` since this deals with validation of the source offset 
instead of the source partition?



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -933,12 +938,94 @@ protected static void 
waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne
         }
     }
 
-    private static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
+    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
         for (Class<? extends Connector> connector : connectorClasses) {
             connectCluster.restartConnectorAndTasks(connector.getSimpleName(), 
false, true, false);
         }
     }
 
+    @SafeVarargs
+    protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, Class<? extends Connector>... connectorClasses) throws 
InterruptedException {
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            connectCluster.resumeConnector(connectorClass.getSimpleName());
+        }
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            String connectorName = connectorClass.getSimpleName();
+            
connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                    connectorName,
+                    1,
+                    "Connector '" + connectorName + "' and/or task did not 
resume in time"
+            );
+        }
+    }
+
+    protected static void 
alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, 
LongUnaryOperator alterOffset, String... topics) {

Review Comment:
   Ah yep, good call 👍 Split into two separate methods.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##########
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition 
topicPartition) {
 
     static Map<String, Object> wrapPartition(TopicPartition topicPartition, 
String sourceClusterAlias) {
         Map<String, Object> wrapped = new HashMap<>();
-        wrapped.put("topic", topicPartition.topic());
-        wrapped.put("partition", topicPartition.partition());
-        wrapped.put("cluster", sourceClusterAlias);
+        wrapped.put(TOPIC_KEY, topicPartition.topic());
+        wrapped.put(PARTITION_KEY, topicPartition.partition());
+        wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
         return wrapped;
     }
 
-    static Map<String, Object> wrapOffset(long offset) {
-        return Collections.singletonMap("offset", offset);
+    public static Map<String, Object> wrapOffset(long offset) {
+        return Collections.singletonMap(OFFSET_KEY, offset);
     }
 
-    static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
-        String topic = (String) wrapped.get("topic");
-        int partition = (Integer) wrapped.get("partition");
+    public static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
+        String topic = (String) wrapped.get(TOPIC_KEY);
+        int partition = (Integer) wrapped.get(PARTITION_KEY);
         return new TopicPartition(topic, partition);
     }
 
     static Long unwrapOffset(Map<String, ?> wrapped) {
-        if (wrapped == null || wrapped.get("offset") == null) {
+        if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
             return -1L;
         }
-        return (Long) wrapped.get("offset");
+        return (Long) wrapped.get(OFFSET_KEY);
+    }
+
+
+    /**
+     * Validate a specific key in a source partition that may be written to 
the offsets topic for one of the MM2 connectors.
+     * This method ensures that the key is present in the source partition map 
and that its value is a string.
+     *
+     * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+     * @see SourceRecord#sourcePartition()
+     *
+     * @param sourcePartition the to-be-validated source partition; may not be 
null
+     * @param key the key to check for in the source partition; may be null

Review Comment:
   It's unlikely that `Map::get` will lead to an NPE; it's optional for maps to 
refuse to allow null keys/values but neither `HashMap` nor `LinkedHashMap` have 
this restriction. But you're correct that no callers pass in null keys and I 
doubt we'll need support for that in the future, so will make the change 👍 



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##########
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition 
topicPartition) {
 
     static Map<String, Object> wrapPartition(TopicPartition topicPartition, 
String sourceClusterAlias) {
         Map<String, Object> wrapped = new HashMap<>();
-        wrapped.put("topic", topicPartition.topic());
-        wrapped.put("partition", topicPartition.partition());
-        wrapped.put("cluster", sourceClusterAlias);
+        wrapped.put(TOPIC_KEY, topicPartition.topic());
+        wrapped.put(PARTITION_KEY, topicPartition.partition());
+        wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
         return wrapped;
     }
 
-    static Map<String, Object> wrapOffset(long offset) {
-        return Collections.singletonMap("offset", offset);
+    public static Map<String, Object> wrapOffset(long offset) {
+        return Collections.singletonMap(OFFSET_KEY, offset);
     }
 
-    static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
-        String topic = (String) wrapped.get("topic");
-        int partition = (Integer) wrapped.get("partition");
+    public static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
+        String topic = (String) wrapped.get(TOPIC_KEY);
+        int partition = (Integer) wrapped.get(PARTITION_KEY);
         return new TopicPartition(topic, partition);
     }
 
     static Long unwrapOffset(Map<String, ?> wrapped) {
-        if (wrapped == null || wrapped.get("offset") == null) {
+        if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
             return -1L;
         }
-        return (Long) wrapped.get("offset");
+        return (Long) wrapped.get(OFFSET_KEY);
+    }
+
+
+    /**
+     * Validate a specific key in a source partition that may be written to 
the offsets topic for one of the MM2 connectors.
+     * This method ensures that the key is present in the source partition map 
and that its value is a string.
+     *
+     * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+     * @see SourceRecord#sourcePartition()
+     *
+     * @param sourcePartition the to-be-validated source partition; may not be 
null
+     * @param key the key to check for in the source partition; may be null

Review Comment:
   It's unlikely that `Map::get` will lead to an NPE; it's optional for maps to 
refuse to allow null keys/values but neither `HashMap` nor `LinkedHashMap` have 
this restriction. But you're correct that no callers pass in null keys and I 
doubt we'll need support for that in the future, so will make the change 👍 



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -933,12 +938,94 @@ protected static void 
waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne
         }
     }
 
-    private static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
+    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
         for (Class<? extends Connector> connector : connectorClasses) {
             connectCluster.restartConnectorAndTasks(connector.getSimpleName(), 
false, true, false);
         }
     }
 
+    @SafeVarargs
+    protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, Class<? extends Connector>... connectorClasses) throws 
InterruptedException {
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            connectCluster.resumeConnector(connectorClass.getSimpleName());
+        }
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            String connectorName = connectorClass.getSimpleName();
+            
connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                    connectorName,
+                    1,
+                    "Connector '" + connectorName + "' and/or task did not 
resume in time"
+            );
+        }
+    }
+
+    protected static void 
alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, 
LongUnaryOperator alterOffset, String... topics) {

Review Comment:
   Ah yep, good call 👍 Split into two separate methods.



##########
connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java:
##########
@@ -933,12 +938,94 @@ protected static void 
waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne
         }
     }
 
-    private static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
+    protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, List<Class<? extends Connector>> connectorClasses)  {
         for (Class<? extends Connector> connector : connectorClasses) {
             connectCluster.restartConnectorAndTasks(connector.getSimpleName(), 
false, true, false);
         }
     }
 
+    @SafeVarargs
+    protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster 
connectCluster, Class<? extends Connector>... connectorClasses) throws 
InterruptedException {
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            connectCluster.resumeConnector(connectorClass.getSimpleName());
+        }
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            String connectorName = connectorClass.getSimpleName();
+            
connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning(
+                    connectorName,
+                    1,
+                    "Connector '" + connectorName + "' and/or task did not 
resume in time"
+            );
+        }
+    }
+
+    protected static void 
alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, 
LongUnaryOperator alterOffset, String... topics) {
+        Set<String> topicsSet = new HashSet<>(Arrays.asList(topics));
+
+        String connectorName = MirrorSourceConnector.class.getSimpleName();
+        connectCluster.stopConnector(connectorName);

Review Comment:
   Good catch! I've added a call to 
`connectCluster.assertions().assertConnectorIsStopped` whenever we stop a 
connector in these tests.



##########
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java:
##########
@@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition 
topicPartition) {
 
     static Map<String, Object> wrapPartition(TopicPartition topicPartition, 
String sourceClusterAlias) {
         Map<String, Object> wrapped = new HashMap<>();
-        wrapped.put("topic", topicPartition.topic());
-        wrapped.put("partition", topicPartition.partition());
-        wrapped.put("cluster", sourceClusterAlias);
+        wrapped.put(TOPIC_KEY, topicPartition.topic());
+        wrapped.put(PARTITION_KEY, topicPartition.partition());
+        wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias);
         return wrapped;
     }
 
-    static Map<String, Object> wrapOffset(long offset) {
-        return Collections.singletonMap("offset", offset);
+    public static Map<String, Object> wrapOffset(long offset) {
+        return Collections.singletonMap(OFFSET_KEY, offset);
     }
 
-    static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
-        String topic = (String) wrapped.get("topic");
-        int partition = (Integer) wrapped.get("partition");
+    public static TopicPartition unwrapPartition(Map<String, ?> wrapped) {
+        String topic = (String) wrapped.get(TOPIC_KEY);
+        int partition = (Integer) wrapped.get(PARTITION_KEY);
         return new TopicPartition(topic, partition);
     }
 
     static Long unwrapOffset(Map<String, ?> wrapped) {
-        if (wrapped == null || wrapped.get("offset") == null) {
+        if (wrapped == null || wrapped.get(OFFSET_KEY) == null) {
             return -1L;
         }
-        return (Long) wrapped.get("offset");
+        return (Long) wrapped.get(OFFSET_KEY);
+    }
+
+
+    /**
+     * Validate a specific key in a source partition that may be written to 
the offsets topic for one of the MM2 connectors.
+     * This method ensures that the key is present in the source partition map 
and that its value is a string.
+     *
+     * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+     * @see SourceRecord#sourcePartition()
+     *
+     * @param sourcePartition the to-be-validated source partition; may not be 
null
+     * @param key the key to check for in the source partition; may be null
+     *
+     * @throws ConnectException if the offset is invalid
+     */
+    static void validateSourcePartitionString(Map<String, ?> sourcePartition, 
String key) {
+        Objects.requireNonNull(sourcePartition, "Source partition may not be 
null");
+
+        if (!sourcePartition.containsKey(key))
+            throw new ConnectException(String.format(
+                    "Source partition %s is missing the '%s' key, which is 
required",
+                    sourcePartition,
+                    key
+            ));
+
+        Object value = sourcePartition.get(key);
+        if (!(value instanceof String)) {
+            throw new ConnectException(String.format(
+                    "Source partition %s has an invalid value %s for the '%s' 
key, which must be a string",
+                    sourcePartition,
+                    value,
+                    key
+            ));
+        }
+    }
+
+    /**
+     * Validate the {@link #PARTITION_KEY partition key} in a source partition 
that may be written to the offsets topic
+     * for one of the MM2 connectors.
+     * This method ensures that the key is present in the source partition map 
and that its value is a non-negative integer.
+     * <p/>
+     * Note that the partition key most likely refers to a partition in a 
Kafka topic, whereas the term "source partition" refers
+     * to a {@link SourceRecord#sourcePartition() source partition} that is 
stored in a Kafka Connect worker's internal offsets
+     * topic (or, if running in standalone mode, offsets file).
+     *
+     * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, 
Map)
+     * @see SourceRecord#sourcePartition()
+     *
+     * @param sourcePartition the to-be-validated source partition; may not be 
null
+     *
+     * @throws ConnectException if the offset is invalid

Review Comment:
   Nice catch, thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to