aho135 commented on code in PR #19372:
URL: https://github.com/apache/druid/pull/19372#discussion_r3230872042


##########
embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaBoundedSupervisorTest.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.testing.embedded.indexing;
+
+import org.apache.druid.common.utils.IdUtils;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.indexing.kafka.simulate.KafkaResource;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
+import org.apache.druid.indexing.seekablestream.supervisor.BoundedStreamConfig;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.testing.embedded.StreamIngestResource;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Tests for bounded Kafka supervisors (one-time ingestion with explicit 
start/end offsets).
+ */
+public class KafkaBoundedSupervisorTest extends StreamIndexTestBase
+{
+  private static final EmittingLogger log = new 
EmittingLogger(KafkaBoundedSupervisorTest.class);
+  private final KafkaResource kafkaServer = new KafkaResource();
+
+  @Override
+  protected StreamIngestResource<?> getStreamIngestResource()
+  {
+    return kafkaServer;
+  }
+
+  @Test
+  public void test_boundedSupervisor_ingestsDataAndCompletes()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+
+    // Publish records before creating supervisor
+    final int totalRecords = publish1kRecords(topic, false);
+
+    // Get the current end offsets for all partitions
+    Map<String, Long> endOffsets = kafkaServer.getPartitionOffsets(topic);
+    Assertions.assertEquals(2, endOffsets.size(), "Should have 2 partitions");
+
+    // Create bounded config with start offset 0 and current end offsets
+    Map<String, Long> startOffsets = new HashMap<>();
+    startOffsets.put("0", 0L);
+    startOffsets.put("1", 0L);
+
+    BoundedStreamConfig boundedConfig = new BoundedStreamConfig(startOffsets, 
endOffsets);
+
+    // Create bounded supervisor
+    final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig
+    );
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Wait for records to be ingested
+    waitUntilPublishedRecordsAreIngested(totalRecords);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor.getId());
+
+    // Verify row count
+    verifyRowCount(totalRecords);
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisor.getId());
+    Assertions.assertEquals("COMPLETED", status.getState());
+    Assertions.assertTrue(status.isHealthy());
+  }
+
+  @Test
+  public void test_boundedSupervisor_withEmptyRange_completesImmediately()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 1);
+
+    // Publish some records
+    publish1kRecords(topic, false);
+
+    // Get current offset
+    Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic);
+    Long currentOffset = currentOffsets.get("0");
+
+    // Create bounded config with start == end (empty range)
+    Map<String, Long> offsets = new HashMap<>();
+    offsets.put("0", currentOffset);
+
+    BoundedStreamConfig boundedConfig = new BoundedStreamConfig(offsets, 
offsets);
+
+    // Create bounded supervisor
+    final KafkaSupervisorSpec supervisor = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig
+    );
+
+    cluster.callApi().postSupervisor(supervisor);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor.getId());
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status = 
cluster.callApi().getSupervisorStatus(supervisor.getId());
+    Assertions.assertEquals("COMPLETED", status.getState());
+  }
+
+  private KafkaSupervisorSpec createBoundedKafkaSupervisor(
+      KafkaResource kafkaServer,
+      String topic,
+      BoundedStreamConfig boundedConfig
+  )
+  {
+    return createKafkaSupervisor(kafkaServer)
+        .withIoConfig(io -> io
+            .withKafkaInputFormat(new JsonInputFormat(null, null, null, null, 
null))
+            .withBoundedStreamConfig(boundedConfig)
+        )
+        .build(dataSource, topic);
+  }
+
+  @Test
+  public void test_boundedSupervisor_withMismatchedMetadata_is_unhealthy()
+  {
+    final String topic = IdUtils.getRandomId();
+    kafkaServer.createTopicWithPartitions(topic, 2);
+    publish1kRecords(topic, false);
+
+    // Get the current end offsets for all partitions
+    Map<String, Long> currentOffsets = kafkaServer.getPartitionOffsets(topic);
+    Assertions.assertEquals(2, currentOffsets.size(), "Should have 2 
partitions");
+
+    // Create first bounded config - ingest only the first 100 records from 
each partition
+    Map<String, Long> startOffsets1 = new HashMap<>();
+    startOffsets1.put("0", 0L);
+    startOffsets1.put("1", 0L);
+
+    Map<String, Long> endOffsets1 = new HashMap<>();
+    endOffsets1.put("0", 100L);
+    endOffsets1.put("1", 100L);
+
+    BoundedStreamConfig boundedConfig1 = new 
BoundedStreamConfig(startOffsets1, endOffsets1);
+
+    // Create first bounded supervisor and run it to completion
+    final KafkaSupervisorSpec supervisor1 = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig1
+    );
+
+    cluster.callApi().postSupervisor(supervisor1);
+
+    // Wait for records to be ingested (approximately 200 records total from 
both partitions)
+    waitUntilPublishedRecordsAreIngested(200);
+
+    // Wait for supervisor to transition to COMPLETED state
+    waitForSupervisorToComplete(supervisor1.getId());
+
+    // Verify supervisor is in COMPLETED state
+    final SupervisorStatus status1 = 
cluster.callApi().getSupervisorStatus(supervisor1.getId());
+    Assertions.assertEquals("COMPLETED", status1.getState());
+
+    // Now try to create a second bounded supervisor with different bounded 
config on the same datasource
+    Map<String, Long> startOffsets2 = new HashMap<>();
+    startOffsets2.put("0", 50L);  // Different start offset
+    startOffsets2.put("1", 50L);
+
+    Map<String, Long> endOffsets2 = new HashMap<>();
+    endOffsets2.put("0", 200L);  // Different end offset
+    endOffsets2.put("1", 200L);
+
+    BoundedStreamConfig boundedConfig2 = new 
BoundedStreamConfig(startOffsets2, endOffsets2);
+
+    final KafkaSupervisorSpec supervisor2 = createBoundedKafkaSupervisor(
+        kafkaServer,
+        topic,
+        boundedConfig2
+    );
+
+    // Post the second supervisor (it should use the same supervisor 
ID/datasource)
+    cluster.callApi().postSupervisor(supervisor2);
+
+    // Wait for the supervisor to process and detect the metadata mismatch
+    // The exception we're testing for is thrown and logged, and causes the 
supervisor to become unhealthy
+    waitForSupervisorToBeUnhealthy(supervisor2.getId());
+
+    // Verify the supervisor is unhealthy
+    final SupervisorStatus status2 = 
cluster.callApi().getSupervisorStatus(supervisor2.getId());
+    Assertions.assertFalse(status2.isHealthy(), "Supervisor should be 
unhealthy after detecting metadata mismatch");
+    Assertions.assertEquals("UNHEALTHY_SUPERVISOR", status2.getState(), 
"Supervisor state should be UNHEALTHY_SUPERVISOR");

Review Comment:
   In the case of metadata mismatch the tasks are never actually spun up. An 
exception is thrown before tasks are ever created.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to