C0urante commented on code in PR #14005: URL: https://github.com/apache/kafka/pull/14005#discussion_r1269901272
########## connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java: ########## @@ -597,7 +596,9 @@ private Set<TopicPartition> listPartitions( Admin admin, Collection<String> topics ) throws TimeoutException, InterruptedException, ExecutionException { - assertFalse("collection of topics may not be empty", topics.isEmpty()); Review Comment: Did the same in `assertConnectorAndExactlyNumTasksAreRunning`. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ########## @@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition topicPartition) { static Map<String, Object> wrapPartition(TopicPartition topicPartition, String sourceClusterAlias) { Map<String, Object> wrapped = new HashMap<>(); - wrapped.put("topic", topicPartition.topic()); - wrapped.put("partition", topicPartition.partition()); - wrapped.put("cluster", sourceClusterAlias); + wrapped.put(TOPIC_KEY, topicPartition.topic()); + wrapped.put(PARTITION_KEY, topicPartition.partition()); + wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias); return wrapped; } - static Map<String, Object> wrapOffset(long offset) { - return Collections.singletonMap("offset", offset); + public static Map<String, Object> wrapOffset(long offset) { + return Collections.singletonMap(OFFSET_KEY, offset); } - static TopicPartition unwrapPartition(Map<String, ?> wrapped) { - String topic = (String) wrapped.get("topic"); - int partition = (Integer) wrapped.get("partition"); + public static TopicPartition unwrapPartition(Map<String, ?> wrapped) { + String topic = (String) wrapped.get(TOPIC_KEY); + int partition = (Integer) wrapped.get(PARTITION_KEY); return new TopicPartition(topic, partition); } static Long unwrapOffset(Map<String, ?> wrapped) { - if (wrapped == null || wrapped.get("offset") == null) { + if (wrapped == null || wrapped.get(OFFSET_KEY) == null) { return -1L; } - return (Long) wrapped.get("offset"); + return (Long) wrapped.get(OFFSET_KEY); + } + + + /** + * Validate a specific key in a source partition that may be written to the offsets topic for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a string. + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * @param key the key to check for in the source partition; may be null + * + * @throws ConnectException if the offset is invalid + */ + static void validateSourcePartitionString(Map<String, ?> sourcePartition, String key) { + Objects.requireNonNull(sourcePartition, "Source partition may not be null"); + + if (!sourcePartition.containsKey(key)) + throw new ConnectException(String.format( + "Source partition %s is missing the '%s' key, which is required", + sourcePartition, + key + )); + + Object value = sourcePartition.get(key); + if (!(value instanceof String)) { + throw new ConnectException(String.format( + "Source partition %s has an invalid value %s for the '%s' key, which must be a string", + sourcePartition, + value, + key + )); + } + } + + /** + * Validate the {@link #PARTITION_KEY partition key} in a source partition that may be written to the offsets topic + * for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a non-negative integer. + * <p/> + * Note that the partition key most likely refers to a partition in a Kafka topic, whereas the term "source partition" refers + * to a {@link SourceRecord#sourcePartition() source partition} that is stored in a Kafka Connect worker's internal offsets + * topic (or, if running in standalone mode, offsets file). + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * + * @throws ConnectException if the offset is invalid + */ + static void validateSourcePartitionPartition(Map<String, ?> sourcePartition) { + Objects.requireNonNull(sourcePartition, "Source partition may not be null"); + + if (!sourcePartition.containsKey(PARTITION_KEY)) + throw new ConnectException(String.format( + "Source partition %s is missing the '%s' key, which is required", + sourcePartition, + PARTITION_KEY + )); + + Object value = sourcePartition.get(PARTITION_KEY); + // The value may be encoded as a long but as long as it fits inside a 32-bit integer, that's fine + if (!(value instanceof Integer || value instanceof Long) || ((Number) value).longValue() > Integer.MAX_VALUE) { + throw new ConnectException(String.format( + "Source partition %s has an invalid value %s for the '%s' key, which must be an integer", + sourcePartition, + value, + PARTITION_KEY + )); + } + + if (((Number) value).intValue() < 0) { + throw new ConnectException(String.format( + "Source partition %s has an invalid value %s for the '%s' key, which cannot be negative", + sourcePartition, + value, + PARTITION_KEY + )); + } + } + + /** + * Validate a source offset that may be written to the offsets topic for one of the MM2 connectors. + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourceOffset() + * + * @param sourcePartition the corresponding {@link SourceRecord#sourcePartition() source partition} for the offset; + * may not be null + * @param sourceOffset the to-be-validated source offset; may be null (which is considered valid) + * @param permitPositiveValues whether positive values for the "offset" value in the source offset map + * should be permitted; if {@code true}, then all non-negative values are permitted; if + * {@code false}, only the value zero is permitted + * + * @throws ConnectException if the offset is invalid + */ + static void validateSourceOffset(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset, boolean permitPositiveValues) { Review Comment: Maybe `onlyOffsetZero` since this deals with validation of the source offset instead of the source partition? ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ########## @@ -933,12 +938,94 @@ protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne } } - private static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) { + protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) { for (Class<? extends Connector> connector : connectorClasses) { connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false); } } + @SafeVarargs + protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, Class<? extends Connector>... connectorClasses) throws InterruptedException { + for (Class<? extends Connector> connectorClass : connectorClasses) { + connectCluster.resumeConnector(connectorClass.getSimpleName()); + } + for (Class<? extends Connector> connectorClass : connectorClasses) { + String connectorName = connectorClass.getSimpleName(); + connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning( + connectorName, + 1, + "Connector '" + connectorName + "' and/or task did not resume in time" + ); + } + } + + protected static void alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, LongUnaryOperator alterOffset, String... topics) { Review Comment: Ah yep, good call 👍 Split into two separate methods. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ########## @@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition topicPartition) { static Map<String, Object> wrapPartition(TopicPartition topicPartition, String sourceClusterAlias) { Map<String, Object> wrapped = new HashMap<>(); - wrapped.put("topic", topicPartition.topic()); - wrapped.put("partition", topicPartition.partition()); - wrapped.put("cluster", sourceClusterAlias); + wrapped.put(TOPIC_KEY, topicPartition.topic()); + wrapped.put(PARTITION_KEY, topicPartition.partition()); + wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias); return wrapped; } - static Map<String, Object> wrapOffset(long offset) { - return Collections.singletonMap("offset", offset); + public static Map<String, Object> wrapOffset(long offset) { + return Collections.singletonMap(OFFSET_KEY, offset); } - static TopicPartition unwrapPartition(Map<String, ?> wrapped) { - String topic = (String) wrapped.get("topic"); - int partition = (Integer) wrapped.get("partition"); + public static TopicPartition unwrapPartition(Map<String, ?> wrapped) { + String topic = (String) wrapped.get(TOPIC_KEY); + int partition = (Integer) wrapped.get(PARTITION_KEY); return new TopicPartition(topic, partition); } static Long unwrapOffset(Map<String, ?> wrapped) { - if (wrapped == null || wrapped.get("offset") == null) { + if (wrapped == null || wrapped.get(OFFSET_KEY) == null) { return -1L; } - return (Long) wrapped.get("offset"); + return (Long) wrapped.get(OFFSET_KEY); + } + + + /** + * Validate a specific key in a source partition that may be written to the offsets topic for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a string. + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * @param key the key to check for in the source partition; may be null Review Comment: It's unlikely that `Map::get` will lead to an NPE; it's optional for maps to refuse to allow null keys/values but neither `HashMap` nor `LinkedHashMap` have this restriction. But you're correct that no callers pass in null keys and I doubt we'll need support for that in the future, so will make the change 👍 ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ########## @@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition topicPartition) { static Map<String, Object> wrapPartition(TopicPartition topicPartition, String sourceClusterAlias) { Map<String, Object> wrapped = new HashMap<>(); - wrapped.put("topic", topicPartition.topic()); - wrapped.put("partition", topicPartition.partition()); - wrapped.put("cluster", sourceClusterAlias); + wrapped.put(TOPIC_KEY, topicPartition.topic()); + wrapped.put(PARTITION_KEY, topicPartition.partition()); + wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias); return wrapped; } - static Map<String, Object> wrapOffset(long offset) { - return Collections.singletonMap("offset", offset); + public static Map<String, Object> wrapOffset(long offset) { + return Collections.singletonMap(OFFSET_KEY, offset); } - static TopicPartition unwrapPartition(Map<String, ?> wrapped) { - String topic = (String) wrapped.get("topic"); - int partition = (Integer) wrapped.get("partition"); + public static TopicPartition unwrapPartition(Map<String, ?> wrapped) { + String topic = (String) wrapped.get(TOPIC_KEY); + int partition = (Integer) wrapped.get(PARTITION_KEY); return new TopicPartition(topic, partition); } static Long unwrapOffset(Map<String, ?> wrapped) { - if (wrapped == null || wrapped.get("offset") == null) { + if (wrapped == null || wrapped.get(OFFSET_KEY) == null) { return -1L; } - return (Long) wrapped.get("offset"); + return (Long) wrapped.get(OFFSET_KEY); + } + + + /** + * Validate a specific key in a source partition that may be written to the offsets topic for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a string. + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * @param key the key to check for in the source partition; may be null Review Comment: It's unlikely that `Map::get` will lead to an NPE; it's optional for maps to refuse to allow null keys/values but neither `HashMap` nor `LinkedHashMap` have this restriction. But you're correct that no callers pass in null keys and I doubt we'll need support for that in the future, so will make the change 👍 ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ########## @@ -933,12 +938,94 @@ protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne } } - private static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) { + protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) { for (Class<? extends Connector> connector : connectorClasses) { connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false); } } + @SafeVarargs + protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, Class<? extends Connector>... connectorClasses) throws InterruptedException { + for (Class<? extends Connector> connectorClass : connectorClasses) { + connectCluster.resumeConnector(connectorClass.getSimpleName()); + } + for (Class<? extends Connector> connectorClass : connectorClasses) { + String connectorName = connectorClass.getSimpleName(); + connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning( + connectorName, + 1, + "Connector '" + connectorName + "' and/or task did not resume in time" + ); + } + } + + protected static void alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, LongUnaryOperator alterOffset, String... topics) { Review Comment: Ah yep, good call 👍 Split into two separate methods. ########## connect/mirror/src/test/java/org/apache/kafka/connect/mirror/integration/MirrorConnectorsIntegrationBaseTest.java: ########## @@ -933,12 +938,94 @@ protected static void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster conne } } - private static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) { + protected static void restartMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, List<Class<? extends Connector>> connectorClasses) { for (Class<? extends Connector> connector : connectorClasses) { connectCluster.restartConnectorAndTasks(connector.getSimpleName(), false, true, false); } } + @SafeVarargs + protected static void resumeMirrorMakerConnectors(EmbeddedConnectCluster connectCluster, Class<? extends Connector>... connectorClasses) throws InterruptedException { + for (Class<? extends Connector> connectorClass : connectorClasses) { + connectCluster.resumeConnector(connectorClass.getSimpleName()); + } + for (Class<? extends Connector> connectorClass : connectorClasses) { + String connectorName = connectorClass.getSimpleName(); + connectCluster.assertions().assertConnectorAndExactlyNumTasksAreRunning( + connectorName, + 1, + "Connector '" + connectorName + "' and/or task did not resume in time" + ); + } + } + + protected static void alterMirrorMakerSourceConnectorOffsets(EmbeddedConnectCluster connectCluster, LongUnaryOperator alterOffset, String... topics) { + Set<String> topicsSet = new HashSet<>(Arrays.asList(topics)); + + String connectorName = MirrorSourceConnector.class.getSimpleName(); + connectCluster.stopConnector(connectorName); Review Comment: Good catch! I've added a call to `connectCluster.assertions().assertConnectorIsStopped` whenever we stop a connector in these tests. ########## connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorUtils.java: ########## @@ -65,27 +72,170 @@ static String encodeTopicPartition(TopicPartition topicPartition) { static Map<String, Object> wrapPartition(TopicPartition topicPartition, String sourceClusterAlias) { Map<String, Object> wrapped = new HashMap<>(); - wrapped.put("topic", topicPartition.topic()); - wrapped.put("partition", topicPartition.partition()); - wrapped.put("cluster", sourceClusterAlias); + wrapped.put(TOPIC_KEY, topicPartition.topic()); + wrapped.put(PARTITION_KEY, topicPartition.partition()); + wrapped.put(SOURCE_CLUSTER_KEY, sourceClusterAlias); return wrapped; } - static Map<String, Object> wrapOffset(long offset) { - return Collections.singletonMap("offset", offset); + public static Map<String, Object> wrapOffset(long offset) { + return Collections.singletonMap(OFFSET_KEY, offset); } - static TopicPartition unwrapPartition(Map<String, ?> wrapped) { - String topic = (String) wrapped.get("topic"); - int partition = (Integer) wrapped.get("partition"); + public static TopicPartition unwrapPartition(Map<String, ?> wrapped) { + String topic = (String) wrapped.get(TOPIC_KEY); + int partition = (Integer) wrapped.get(PARTITION_KEY); return new TopicPartition(topic, partition); } static Long unwrapOffset(Map<String, ?> wrapped) { - if (wrapped == null || wrapped.get("offset") == null) { + if (wrapped == null || wrapped.get(OFFSET_KEY) == null) { return -1L; } - return (Long) wrapped.get("offset"); + return (Long) wrapped.get(OFFSET_KEY); + } + + + /** + * Validate a specific key in a source partition that may be written to the offsets topic for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a string. + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * @param key the key to check for in the source partition; may be null + * + * @throws ConnectException if the offset is invalid + */ + static void validateSourcePartitionString(Map<String, ?> sourcePartition, String key) { + Objects.requireNonNull(sourcePartition, "Source partition may not be null"); + + if (!sourcePartition.containsKey(key)) + throw new ConnectException(String.format( + "Source partition %s is missing the '%s' key, which is required", + sourcePartition, + key + )); + + Object value = sourcePartition.get(key); + if (!(value instanceof String)) { + throw new ConnectException(String.format( + "Source partition %s has an invalid value %s for the '%s' key, which must be a string", + sourcePartition, + value, + key + )); + } + } + + /** + * Validate the {@link #PARTITION_KEY partition key} in a source partition that may be written to the offsets topic + * for one of the MM2 connectors. + * This method ensures that the key is present in the source partition map and that its value is a non-negative integer. + * <p/> + * Note that the partition key most likely refers to a partition in a Kafka topic, whereas the term "source partition" refers + * to a {@link SourceRecord#sourcePartition() source partition} that is stored in a Kafka Connect worker's internal offsets + * topic (or, if running in standalone mode, offsets file). + * + * @see org.apache.kafka.connect.source.SourceConnector#alterOffsets(Map, Map) + * @see SourceRecord#sourcePartition() + * + * @param sourcePartition the to-be-validated source partition; may not be null + * + * @throws ConnectException if the offset is invalid Review Comment: Nice catch, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org