This is an automated email from the ASF dual-hosted git repository.
kfaraz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 0c849d6a390 Add builders for Kafka supervisor spec, ioConfig,
tuningConfig (#18460)
0c849d6a390 is described below
commit 0c849d6a3902ac77e31da7ebe993707c46c52949
Author: Kashif Faraz <[email protected]>
AuthorDate: Tue Sep 2 22:58:10 2025 +0530
Add builders for Kafka supervisor spec, ioConfig, tuningConfig (#18460)
Changes:
- Add `KafkaSupervisorSpecBuilder`
- Add `KafkaTuningConfigBuilder`
- Add `KafkaIOConfigBuilder`
- Use the above builders in tests
---
.../embedded/compact/AutoCompactionTest.java | 6 +-
.../embedded/compact/CompactionTestBase.java | 2 +-
.../embedded/indexing/IngestionSmokeTest.java | 49 +--
.../embedded/indexing/KafkaClusterMetricsTest.java | 76 +----
.../embedded/indexing/KafkaDataFormatsTest.java | 91 ++----
.../testing/embedded/indexing/MoreResources.java | 32 ++
.../embedded/msq/BaseRealtimeQueryTest.java | 68 +---
.../embedded/server/OverlordClientTest.java | 26 +-
.../kafka/supervisor/KafkaSupervisorSpec.java | 12 +-
.../kafka/KafkaIndexTaskTuningConfigTest.java | 48 +--
.../druid/indexing/kafka/KafkaSamplerSpecTest.java | 355 +++++----------------
.../simulate/EmbeddedKafkaSupervisorTest.java | 57 ++--
.../kafka/supervisor/KafkaIOConfigBuilder.java | 96 ++++++
.../supervisor/KafkaSupervisorSpecBuilder.java | 96 ++++++
.../kafka/supervisor/KafkaSupervisorSpecTest.java | 121 ++-----
.../kafka/supervisor/KafkaSupervisorTest.java | 239 +++-----------
.../KafkaSupervisorTuningConfigTest.java | 4 +-
.../kafka/supervisor/KafkaTuningConfigBuilder.java | 119 +++++++
indexing-service/pom.xml | 6 +
.../druid/indexing/common/task/TaskBuilder.java | 116 ++++---
.../indexing/common/task/TuningConfigBuilder.java | 147 +++++----
.../supervisor/SupervisorIOConfigBuilder.java | 164 ++++++++++
22 files changed, 930 insertions(+), 1000 deletions(-)
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
index bd89df768a2..6ef481d210f 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/AutoCompactionTest.java
@@ -1616,17 +1616,17 @@ public class AutoCompactionTest extends
CompactionTestBase
}
}
- private <T extends TaskBuilder.IndexCommon<?, ?, ?>> void
loadData(Supplier<T> updatePayload)
+ private <T extends TaskBuilder.IndexCommon<?, ?, ?, ?>> void
loadData(Supplier<T> updatePayload)
{
loadData(updatePayload, null);
}
- private <T extends TaskBuilder.IndexCommon<?, ?, ?>> void loadData(
+ private <T extends TaskBuilder.IndexCommon<?, ?, ?, ?>> void loadData(
Supplier<T> taskPayloadSupplier,
GranularitySpec granularitySpec
)
{
- final TaskBuilder.IndexCommon<?, ?, ?> taskBuilder =
taskPayloadSupplier.get();
+ final TaskBuilder.IndexCommon<?, ?, ?, ?> taskBuilder =
taskPayloadSupplier.get();
taskBuilder.dataSource(fullDatasourceName);
if (granularitySpec != null) {
taskBuilder.granularitySpec(granularitySpec);
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
index 109c9c12aae..6719c0b5496 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionTestBase.java
@@ -67,7 +67,7 @@ public abstract class CompactionTestBase extends
EmbeddedClusterTestBase
/**
* Creates and runs a task for the current {@link #dataSource}.
*/
- protected String runTask(TaskBuilder<?, ?, ?> taskBuilder)
+ protected String runTask(TaskBuilder<?, ?, ?, ?> taskBuilder)
{
final String taskId = IdUtils.getRandomId();
cluster.callApi().runTask(taskBuilder.dataSource(dataSource).withId(taskId),
overlord);
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
index af9c3d21c8e..668079065c4 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/IngestionSmokeTest.java
@@ -22,7 +22,6 @@ package org.apache.druid.testing.embedded.indexing;
import com.google.common.collect.ImmutableList;
import org.apache.druid.common.utils.IdUtils;
import org.apache.druid.data.input.impl.CsvInputFormat;
-import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
@@ -32,9 +31,7 @@ import org.apache.druid.indexing.common.task.TaskBuilder;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.simulate.KafkaResource;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.overlord.Segments;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.DateTimes;
@@ -44,7 +41,6 @@ import
org.apache.druid.java.util.common.parsers.CloseableIterator;
import
org.apache.druid.metadata.storage.postgresql.PostgreSQLMetadataStorageModule;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.query.http.SqlTaskStatus;
-import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -304,7 +300,7 @@ public class IngestionSmokeTest extends
EmbeddedClusterTestBase
(CloseableIterator<TaskStatusPlus>)
cluster.callApi().onLeaderOverlord(o -> o.taskStatuses(null,
dataSource, 1))
);
- Assertions.assertEquals(1, taskStatuses.size());
+ Assertions.assertFalse(taskStatuses.isEmpty());
Assertions.assertEquals(TaskState.RUNNING,
taskStatuses.get(0).getStatusCode());
// Suspend the supervisor and verify the state
@@ -317,39 +313,16 @@ public class IngestionSmokeTest extends
EmbeddedClusterTestBase
private KafkaSupervisorSpec createKafkaSupervisor(String topic)
{
- return new KafkaSupervisorSpec(
- null,
- null,
- DataSchema.builder()
- .withDataSource(dataSource)
- .withTimestamp(new TimestampSpec("timestamp", null, null))
- .withDimensions(DimensionsSpec.EMPTY)
- .build(),
- createTuningConfig(),
- new KafkaSupervisorIOConfig(
- topic,
- null,
- new CsvInputFormat(List.of("timestamp", "item"), null, null,
false, 0, false),
- null, null,
- null,
- kafkaServer.consumerProperties(),
- null, null, null, null, null,
- true,
- null, null, null, null, null, null, null, null
- ),
- null, null, null, null, null, null, null, null, null, null, null
- );
- }
-
- private KafkaSupervisorTuningConfig createTuningConfig()
- {
- return new KafkaSupervisorTuningConfig(
- null,
- null, null, null,
- 1,
- null, null, null, null, null, null, null, null, null, null,
- null, null, null, null, null, null, null, null, null, null, null
- );
+ return MoreResources.Supervisor.KAFKA_JSON
+ .get()
+ .withDataSchema(schema -> schema.withTimestamp(new
TimestampSpec("timestamp", null, null)))
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withInputFormat(new CsvInputFormat(List.of("timestamp",
"item"), null, null, false, 0, false))
+ )
+ .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(1))
+ .build(dataSource, topic);
}
private List<ProducerRecord<byte[], byte[]>> generateRecordsForTopic(
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
index 2cf03e99ce9..5a977b861e8 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaClusterMetricsTest.java
@@ -20,24 +20,17 @@
package org.apache.druid.testing.embedded.indexing;
import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.emitter.kafka.KafkaEmitter;
import org.apache.druid.emitter.kafka.KafkaEmitterModule;
-import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexing.compact.CompactionSupervisorSpec;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.simulate.KafkaResource;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.overlord.Segments;
-import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.DruidMetrics;
import org.apache.druid.rpc.UpdateResponse;
import org.apache.druid.rpc.indexing.OverlordClient;
-import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.DruidNode;
import org.apache.druid.server.coordinator.ClusterCompactionConfig;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
@@ -143,16 +136,12 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
final int expectedSegmentsHandedOff = 10;
final int taskCount = 5;
- final int taskDurationMillis = 1_000;
- final int taskCompletionTimeoutMillis = 10_000;
// Submit and start a supervisor
final String supervisorId = dataSource + "_supe";
final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(
supervisorId,
taskCount,
- taskDurationMillis,
- taskCompletionTimeoutMillis,
maxRowsPerSegment
);
@@ -194,16 +183,12 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
final int compactedMaxRowsPerSegment = 5000;
final int taskCount = 2;
- final int taskDurationMillis = 500;
- final int taskCompletionTimeoutMillis = 5_000;
// Submit and start a supervisor
final String supervisorId = dataSource + "_supe";
final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(
supervisorId,
taskCount,
- taskDurationMillis,
- taskCompletionTimeoutMillis,
maxRowsPerSegment
);
cluster.callApi().postSupervisor(kafkaSupervisorSpec);
@@ -315,16 +300,12 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
final int compactedMaxRowsPerSegment = 5000;
final int taskCount = 2;
- final int taskDurationMillis = 500;
- final int taskCompletionTimeoutMillis = 5_000;
// Submit and start a supervisor
final String supervisorId = dataSource + "_supe";
final KafkaSupervisorSpec kafkaSupervisorSpec = createKafkaSupervisor(
supervisorId,
taskCount,
- taskDurationMillis,
- taskCompletionTimeoutMillis,
maxRowsPerSegment
);
cluster.callApi().postSupervisor(kafkaSupervisorSpec);
@@ -382,7 +363,7 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
* SELECTs the total count of the given metric in the {@link #dataSource} and
* verifies it against the metrics actually emitted by the server.
*/
- private void verifyIngestedMetricCountMatchesEmittedCount(String metricName,
EmbeddedDruidServer server)
+ private void verifyIngestedMetricCountMatchesEmittedCount(String metricName,
EmbeddedDruidServer<?> server)
{
// Get the value of the metric from the datasource
final DruidNode selfNode = server.bindings().selfNode();
@@ -404,52 +385,19 @@ public class KafkaClusterMetricsTest extends
EmbeddedClusterTestBase
private KafkaSupervisorSpec createKafkaSupervisor(
String supervisorId,
int taskCount,
- int taskDurationMillis,
- int taskCompletionTimeoutMillis,
int maxRowsPerSegment
)
{
- final Period startDelay = Period.millis(10);
- final Period supervisorRunPeriod = Period.millis(500);
- final boolean useEarliestOffset = true;
-
- return new KafkaSupervisorSpec(
- supervisorId,
- null,
- DataSchema.builder()
- .withDataSource(dataSource)
- .withTimestamp(new TimestampSpec("timestamp", "iso", null))
- .withGranularity(new
UniformGranularitySpec(Granularities.HOUR, null, null))
- .withDimensions(DimensionsSpec.EMPTY)
- .build(),
- createTuningConfig(maxRowsPerSegment),
- new KafkaSupervisorIOConfig(
- TOPIC,
- null,
- new JsonInputFormat(null, null, null, null, null),
- null,
- taskCount,
- Period.millis(taskDurationMillis),
- kafkaServer.consumerProperties(),
- null, null, null,
- startDelay,
- supervisorRunPeriod,
- useEarliestOffset,
- Period.millis(taskCompletionTimeoutMillis),
- null, null, null, null, null, null, null
- ),
- null, null, null, null, null, null, null, null, null, null, null
- );
- }
-
- private KafkaSupervisorTuningConfig createTuningConfig(int maxRowsPerSegment)
- {
- return new KafkaSupervisorTuningConfig(
- null,
- null, null, null,
- maxRowsPerSegment,
- null, null, null, null, null, null, null, null, null, null,
- null, null, null, null, null, null, null, null, null, null, null
- );
+ return MoreResources.Supervisor.KAFKA_JSON
+ .get()
+ .withDataSchema(schema -> schema.withTimestamp(new
TimestampSpec("timestamp", "iso", null)))
+ .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(maxRowsPerSegment))
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withTaskCount(taskCount)
+ )
+ .withId(supervisorId)
+ .build(dataSource, TOPIC);
}
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
index c8fff7347d5..7603700a55d 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/KafkaDataFormatsTest.java
@@ -44,13 +44,10 @@ import
org.apache.druid.data.input.protobuf.ProtobufInputRowParser;
import
org.apache.druid.data.input.protobuf.SchemaRegistryBasedProtobufBytesDecoder;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.simulate.KafkaResource;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.query.DruidMetrics;
-import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -581,70 +578,38 @@ public class KafkaDataFormatsTest extends
EmbeddedClusterTestBase
parser,
new TypeReference<>() {}
);
- return new KafkaSupervisorSpec(
- supervisorId,
- null,
- DataSchema.builder()
- .withDataSource(dataSource)
- .withTimestamp(new TimestampSpec("timestamp", null, null))
- .withDimensions(DimensionsSpec.EMPTY)
- .withParserMap(parserMap)
- .build(),
- createTuningConfig(),
- new KafkaSupervisorIOConfig(
- topic,
- null,
- null,
- null, null,
- Period.millis(100),
- kafkaServer.consumerProperties(),
- null, null, null,
- Period.millis(10),
- Period.millis(10),
- true,
- null, null, null, null, null, null, null, null
- ),
- null, null, null, null, null, null, null, null, null, null, null
- );
+ return MoreResources.Supervisor.KAFKA_JSON
+ .get()
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec("timestamp", null, null))
+ .withParserMap(parserMap)
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withInputFormat(null)
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withSupervisorRunPeriod(Period.millis(10))
+ )
+ .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(1))
+ .withId(supervisorId)
+ .build(dataSource, topic);
}
private KafkaSupervisorSpec createKafkaSupervisor(String supervisorId,
String topic, InputFormat inputFormat)
{
- return new KafkaSupervisorSpec(
- supervisorId,
- null,
- DataSchema.builder()
- .withDataSource(dataSource)
- .withTimestamp(new TimestampSpec("timestamp", null, null))
- .withDimensions(DimensionsSpec.EMPTY)
- .build(),
- createTuningConfig(),
- new KafkaSupervisorIOConfig(
- topic,
- null,
- inputFormat,
- null, null,
- Period.millis(100),
- kafkaServer.consumerProperties(),
- null, null, null,
- Period.millis(10),
- Period.millis(10),
- true,
- null, null, null, null, null, null, null, null
- ),
- null, null, null, null, null, null, null, null, null, null, null
- );
- }
-
- private KafkaSupervisorTuningConfig createTuningConfig()
- {
- return new KafkaSupervisorTuningConfig(
- null,
- null, null, null,
- 1,
- null, null, null, null, null, null, null, null, null, null,
- null, null, null, null, null, null, null, null, null, null, null
- );
+ return MoreResources.Supervisor.KAFKA_JSON
+ .get()
+ .withDataSchema(schema -> schema.withTimestamp(new
TimestampSpec("timestamp", null, null)))
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withInputFormat(inputFormat)
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withSupervisorRunPeriod(Period.millis(10))
+ )
+ .withTuningConfig(tuningConfig ->
tuningConfig.withMaxRowsPerSegment(1))
+ .withId(supervisorId)
+ .build(dataSource, topic);
}
private Map<String, Object> createWikipediaAvroSchemaMap()
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
index 1228da0ab83..b50d4f2e601 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/indexing/MoreResources.java
@@ -19,12 +19,17 @@
package org.apache.druid.testing.embedded.indexing;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexing.common.task.TaskBuilder;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
+import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.DoubleSumAggregatorFactory;
import
org.apache.druid.query.aggregation.datasketches.hll.HllSketchBuildAggregatorFactory;
import
org.apache.druid.query.aggregation.datasketches.quantiles.DoublesSketchAggregatorFactory;
import
org.apache.druid.query.aggregation.datasketches.theta.SketchMergeAggregatorFactory;
+import org.joda.time.Period;
import java.util.function.Supplier;
@@ -81,6 +86,33 @@ public class MoreResources
.appendToExisting(false);
}
+ /**
+ * Supervisor spec builder.
+ */
+ public static class Supervisor
+ {
+ /**
+ * A minimal Kafka supervisor spec that auto-discovers dimensions, has a
task
+ * duration of 500ms and creates HOUR-ly segments.
+ */
+ public static Supplier<KafkaSupervisorSpecBuilder> KAFKA_JSON = () ->
+ new KafkaSupervisorSpecBuilder()
+ .withDataSchema(
+ schema -> schema
+ .withDimensions(DimensionsSpec.EMPTY)
+ .withGranularity(new
UniformGranularitySpec(Granularities.HOUR, null, null))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withTaskDuration(Period.millis(500))
+ .withStartDelay(Period.millis(10))
+ .withSupervisorRunPeriod(Period.millis(500))
+ .withUseEarliestSequenceNumber(true)
+ .withCompletionTimeout(Period.seconds(5))
+ );
+ }
+
public static class ProbufData
{
public static final String WIKI_PROTOBUF_BYTES_DECODER_RESOURCE =
"data/protobuf/wikipedia.desc";
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
index 0d82c6f7850..1934786afda 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/BaseRealtimeQueryTest.java
@@ -22,14 +22,12 @@ package org.apache.druid.testing.embedded.msq;
import com.fasterxml.jackson.core.JsonProcessingException;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import org.apache.druid.data.input.impl.DimensionsSpec;
-import org.apache.druid.data.input.impl.JsonInputFormat;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
import org.apache.druid.indexing.kafka.simulate.KafkaResource;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
@@ -37,9 +35,9 @@ import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexCursorFactory;
import org.apache.druid.segment.TestHelper;
import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.testing.embedded.EmbeddedClusterApis;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.joda.time.Period;
@@ -145,54 +143,20 @@ public class BaseRealtimeQueryTest extends
EmbeddedClusterTestBase
private KafkaSupervisorSpec createKafkaSupervisor()
{
- final Period startDelay = Period.millis(10);
- final Period supervisorRunPeriod = Period.millis(500);
- final boolean useEarliestOffset = true;
-
- return new KafkaSupervisorSpec(
- dataSource,
- null,
- DataSchema.builder()
- .withDataSource(dataSource)
- .withTimestamp(new TimestampSpec("__time", "auto", null))
- .withGranularity(new
UniformGranularitySpec(Granularities.DAY, null, null))
-
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
- .build(),
- null,
- new KafkaSupervisorIOConfig(
- topic,
- null,
- new JsonInputFormat(null, null, null, null, null),
- null,
- TASK_COUNT,
- TASK_DURATION,
- kafka.consumerProperties(),
- null,
- null,
- null,
- startDelay,
- supervisorRunPeriod,
- useEarliestOffset,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- ),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ return MoreResources.Supervisor.KAFKA_JSON
+ .get()
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec("__time", "auto", null))
+ .withGranularity(new UniformGranularitySpec(Granularities.DAY,
null, null))
+
.withDimensions(DimensionsSpec.builder().useSchemaDiscovery(true).build())
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withTaskCount(TASK_COUNT)
+ .withTaskDuration(TASK_DURATION)
+ .withConsumerProperties(kafka.consumerProperties())
+ )
+ .build(dataSource, topic);
}
}
diff --git
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
index 36de4c072a7..5cb0398caed 100644
---
a/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
+++
b/embedded-tests/src/test/java/org/apache/druid/testing/embedded/server/OverlordClientTest.java
@@ -25,12 +25,11 @@ import com.google.common.util.concurrent.ListenableFuture;
import org.apache.druid.client.indexing.IndexingTotalWorkerCapacityInfo;
import org.apache.druid.client.indexing.IndexingWorkerInfo;
import org.apache.druid.common.utils.IdUtils;
-import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.common.task.NoopTask;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.Intervals;
@@ -40,11 +39,11 @@ import org.apache.druid.rpc.HttpResponseException;
import org.apache.druid.rpc.indexing.OverlordClient;
import org.apache.druid.rpc.indexing.SegmentUpdateResponse;
import org.apache.druid.segment.TestDataSource;
-import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.http.SegmentsToUpdateFilter;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
import org.apache.druid.testing.embedded.EmbeddedIndexer;
import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.indexing.MoreResources;
import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.druid.timeline.SegmentId;
import org.junit.jupiter.api.Assertions;
@@ -213,22 +212,11 @@ public class OverlordClientTest extends
EmbeddedClusterTestBase
@Test
public void test_postSupervisor_fails_ifRequiredExtensionIsNotLoaded()
{
- final KafkaSupervisorSpec kafkaSupervisor = new KafkaSupervisorSpec(
- null,
- null,
- DataSchema.builder().withDataSource(dataSource).build(),
- null,
- new KafkaSupervisorIOConfig(
- "topic",
- null,
- new JsonInputFormat(null, null, null, null, null),
- null, null, null,
- Map.of("bootstrap.servers", "localhost:9092"),
- null, null, null, null, null, null, null, null, null, null, null,
null, null, null
- ),
- Map.of(),
- null, null, null, null, null, null, null, null, null, null
- );
+ final KafkaSupervisorSpec kafkaSupervisor =
MoreResources.Supervisor.KAFKA_JSON
+ .get()
+ .withDataSchema(schema -> schema.withTimestamp(new TimestampSpec(null,
null, null)))
+ .withIoConfig(ioConfig ->
ioConfig.withConsumerProperties(Map.of("bootstrap.servers", "localhost")))
+ .build(dataSource, "topic");
final Exception exception = Assertions.assertThrows(
Exception.class,
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
index f89a6a1f965..b607ade1acf 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpec.java
@@ -24,6 +24,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.common.config.Configs;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.InvalidInput;
import org.apache.druid.guice.annotations.Json;
@@ -74,14 +75,9 @@ public class KafkaSupervisorSpec extends
SeekableStreamSupervisorSpec
{
super(
id,
- ingestionSchema != null
- ? ingestionSchema
- : new KafkaSupervisorIngestionSpec(
- dataSchema,
- ioConfig,
- tuningConfig != null
- ? tuningConfig
- : KafkaSupervisorTuningConfig.defaultConfig()
+ Configs.valueOrDefault(
+ ingestionSchema,
+ new KafkaSupervisorIngestionSpec(dataSchema, ioConfig,
tuningConfig)
),
context,
suspended,
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 132b65a15cb..54ae10080a7 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaTuningConfigBuilder;
import
org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
@@ -70,7 +71,7 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertEquals(IndexSpec.DEFAULT,
config.getIndexSpecForIntermediatePersists());
- Assert.assertEquals(false, config.isReportParseExceptions());
+ Assert.assertFalse(config.isReportParseExceptions());
Assert.assertEquals(Duration.ofMinutes(15).toMillis(),
config.getHandoffConditionTimeout());
Assert.assertEquals(1, config.getNumPersistThreads());
Assert.assertEquals(-1, config.getMaxColumnsToMerge());
@@ -130,34 +131,19 @@ public class KafkaIndexTaskTuningConfigTest
@Test
public void testConvert()
{
- KafkaSupervisorTuningConfig original = new KafkaSupervisorTuningConfig(
- null,
- 1,
- null,
- null,
- 2,
- 10L,
- new Period("PT3S"),
- 4,
- IndexSpec.DEFAULT,
- IndexSpec.DEFAULT,
- true,
- 5L,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- 2,
- 5,
- false
- );
+ KafkaSupervisorTuningConfig original = new KafkaTuningConfigBuilder()
+ .withIntermediatePersistPeriod(new Period("PT3S"))
+ .withHandoffConditionTimeout(5L)
+ .withNumPersistThreads(2)
+ .withMaxRowsInMemory(1)
+ .withMaxRowsPerSegment(2)
+ .withMaxTotalRows(10L)
+ .withMaxPendingPersists(4)
+ .withIndexSpec(IndexSpec.DEFAULT)
+ .withIndexSpecForIntermediatePersists(IndexSpec.DEFAULT)
+ .withReportParseExceptions(true)
+ .withMaxColumnsToMerge(5)
+ .build();
KafkaIndexTaskTuningConfig copy = original.convertToTaskTuningConfig();
Assert.assertEquals(original.getAppendableIndexSpec(),
copy.getAppendableIndexSpec());
@@ -169,7 +155,7 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertNull(copy.getBasePersistDirectory());
Assert.assertEquals(4, copy.getMaxPendingPersists());
Assert.assertEquals(IndexSpec.DEFAULT, copy.getIndexSpec());
- Assert.assertEquals(true, copy.isReportParseExceptions());
+ Assert.assertTrue(copy.isReportParseExceptions());
Assert.assertEquals(5L, copy.getHandoffConditionTimeout());
Assert.assertEquals(2, copy.getNumPersistThreads());
Assert.assertEquals(5, copy.getMaxColumnsToMerge());
@@ -207,7 +193,7 @@ public class KafkaIndexTaskTuningConfigTest
TestModifiedKafkaIndexTaskTuningConfig deserialized =
mapper.readValue(serialized,
TestModifiedKafkaIndexTaskTuningConfig.class);
- Assert.assertEquals(null, deserialized.getExtra());
+ Assert.assertNull(deserialized.getExtra());
Assert.assertEquals(base.getAppendableIndexSpec(),
deserialized.getAppendableIndexSpec());
Assert.assertEquals(base.getMaxRowsInMemory(),
deserialized.getMaxRowsInMemory());
Assert.assertEquals(base.getMaxBytesInMemory(),
deserialized.getMaxBytesInMemory());
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
index 5e21fd0aa7f..872752b7927 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaSamplerSpecTest.java
@@ -34,10 +34,9 @@ import org.apache.druid.data.input.impl.LongDimensionSchema;
import org.apache.druid.data.input.impl.StringDimensionSchema;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
-import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
import org.apache.druid.indexing.kafka.test.TestBroker;
import org.apache.druid.indexing.overlord.sampler.InputSourceSampler;
import org.apache.druid.indexing.overlord.sampler.SamplerConfig;
@@ -65,12 +64,12 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.function.Consumer;
import java.util.regex.Pattern;
public class KafkaSamplerSpecTest extends InitializedNullHandlingTest
@@ -80,9 +79,10 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
private static final ObjectMapper OBJECT_MAPPER =
TestHelper.makeJsonMapper();
private static final String TOPIC = "sampling";
- private static final DataSchema DATA_SCHEMA =
- DataSchema.builder()
- .withDataSource("test_ds")
+ private static final String DATASOURCE = "test_ds";
+ private static final Consumer<DataSchema.Builder> DATA_SCHEMA =
+ schema ->
+ schema.withDataSource(DATASOURCE)
.withTimestamp(new TimestampSpec("timestamp", "iso", null))
.withDimensions(
new StringDimensionSchema("dim1"),
@@ -97,13 +97,13 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
)
.withGranularity(
new UniformGranularitySpec(Granularities.DAY,
Granularities.NONE, null)
- )
- .build();
+ );
- private static final DataSchema DATA_SCHEMA_KAFKA_TIMESTAMP =
- DataSchema.builder(DATA_SCHEMA)
- .withTimestamp(new TimestampSpec("kafka.timestamp", "iso",
null))
- .build();
+ private static final Consumer<DataSchema.Builder>
DATA_SCHEMA_KAFKA_TIMESTAMP =
+ schema -> {
+ DATA_SCHEMA.accept(schema);
+ schema.withTimestamp(new TimestampSpec("kafka.timestamp", "iso",
null));
+ };
private static TestingCluster zkServer;
private static TestBroker kafkaServer;
@@ -142,46 +142,15 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
{
insertData(generateRecords(TOPIC));
- KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
- null,
- null,
- DATA_SCHEMA,
- null,
- new KafkaSupervisorIOConfig(
- TOPIC,
- null,
- new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
- null,
- null,
- null,
- kafkaServer.consumerProperties(),
- null,
- null,
- null,
- null,
- null,
- true,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- false
- ),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+ .withDataSchema(DATA_SCHEMA)
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withUseEarliestSequenceNumber(true)
+ )
+ .build(DATASOURCE, TOPIC);
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
@@ -198,46 +167,15 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
{
insertData(generateRecords(TOPIC));
- KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
- null,
- null,
- DATA_SCHEMA,
- null,
- new KafkaSupervisorIOConfig(
- null,
- Pattern.quote(TOPIC),
- new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
- null,
- null,
- null,
- kafkaServer.consumerProperties(),
- null,
- null,
- null,
- null,
- null,
- true,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- false
- ),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+ .withDataSchema(DATA_SCHEMA)
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withUseEarliestSequenceNumber(true)
+ )
+ .buildWithTopicPattern(DATASOURCE, Pattern.quote(TOPIC));
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
@@ -254,55 +192,15 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
{
insertData(generateRecords(TOPIC));
- KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
- null,
- null,
- DATA_SCHEMA_KAFKA_TIMESTAMP,
- null,
- new KafkaSupervisorIOConfig(
- TOPIC,
- null,
- new KafkaInputFormat(
- null,
- null,
- new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null,
null),
- null,
- null,
- null,
- null
- ),
-
- null,
- null,
- null,
- kafkaServer.consumerProperties(),
- null,
- null,
- null,
- null,
- null,
- true,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- false
- ),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+ .withDataSchema(DATA_SCHEMA_KAFKA_TIMESTAMP)
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withKafkaInputFormat(new JsonInputFormat(null, null, null,
null, null))
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withUseEarliestSequenceNumber(true)
+ )
+ .build(DATASOURCE, TOPIC);
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
@@ -340,7 +238,7 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
}
@Test
- public void testWithInputRowParser() throws IOException
+ public void testWithInputRowParser()
{
insertData(generateRecords(TOPIC));
@@ -357,59 +255,24 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
);
InputRowParser parser = new StringInputRowParser(new
JSONParseSpec(timestampSpec, dimensionsSpec, JSONPathSpec.DEFAULT, null, null),
"UTF8");
- DataSchema dataSchema = DataSchema.builder()
- .withDataSource("test_ds")
- .withParserMap(
-
objectMapper.readValue(objectMapper.writeValueAsBytes(parser), Map.class)
- )
- .withAggregators(
- new
DoubleSumAggregatorFactory("met1sum", "met1"),
- new CountAggregatorFactory("rows")
- )
- .withGranularity(new
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null))
- .withObjectMapper(objectMapper)
- .build();
-
- KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
- null,
- null,
- dataSchema,
- null,
- new KafkaSupervisorIOConfig(
- TOPIC,
- null,
- null,
- null,
- null,
- null,
- kafkaServer.consumerProperties(),
- null,
- null,
- null,
- null,
- null,
- true,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- false
- ),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ Consumer<DataSchema.Builder> dataSchema =
+ builder -> builder.withDataSource("test_ds")
+ .withParserMap(objectMapper.convertValue(parser,
Map.class))
+ .withAggregators(
+ new DoubleSumAggregatorFactory("met1sum",
"met1"),
+ new CountAggregatorFactory("rows")
+ )
+ .withGranularity(new
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null))
+ .withObjectMapper(objectMapper);
+
+ KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+ .withDataSchema(dataSchema)
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withUseEarliestSequenceNumber(true)
+ )
+ .build(DATASOURCE, TOPIC);
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
@@ -554,49 +417,14 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
@Test
public void testInvalidKafkaConfig()
{
- KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
- null,
- null,
- DATA_SCHEMA,
- null,
- new KafkaSupervisorIOConfig(
- TOPIC,
- null,
- new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
- null,
- null,
- null,
-
- // invalid bootstrap server
- ImmutableMap.of("bootstrap.servers", "127.0.0.1"),
-
- null,
- null,
- null,
- null,
- null,
- true,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- false
- ),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withConsumerProperties(Map.of("bootstrap.servers", "invalid"))
+ .withUseEarliestSequenceNumber(true)
+ )
+ .build(DATASOURCE, TOPIC);
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
@@ -613,49 +441,14 @@ public class KafkaSamplerSpecTest extends
InitializedNullHandlingTest
@Test
public void testGetInputSourceResources()
{
- KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpec(
- null,
- null,
- DATA_SCHEMA,
- null,
- new KafkaSupervisorIOConfig(
- TOPIC,
- null,
- new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
- null,
- null,
- null,
-
- // invalid bootstrap server
- ImmutableMap.of("bootstrap.servers", "127.0.0.1"),
-
- null,
- null,
- null,
- null,
- null,
- true,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- false
- ),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ KafkaSupervisorSpec supervisorSpec = new KafkaSupervisorSpecBuilder()
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withConsumerProperties(Map.of("bootstrap.servers", "invalid"))
+ .withUseEarliestSequenceNumber(true)
+ )
+ .build(DATASOURCE, TOPIC);
KafkaSamplerSpec samplerSpec = new KafkaSamplerSpec(
supervisorSpec,
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
index 058f3ffa736..a5918f33821 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedKafkaSupervisorTest.java
@@ -27,16 +27,14 @@ import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatusPlus;
import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorIOConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
-import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
import org.apache.druid.indexing.overlord.supervisor.SupervisorStatus;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.metadata.LockFilterPolicy;
import org.apache.druid.query.DruidMetrics;
-import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.testing.embedded.EmbeddedBroker;
import org.apache.druid.testing.embedded.EmbeddedCoordinator;
import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
@@ -47,7 +45,6 @@ import
org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.joda.time.DateTime;
import org.joda.time.Interval;
-import org.joda.time.Period;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -147,39 +144,25 @@ public class EmbeddedKafkaSupervisorTest extends
EmbeddedClusterTestBase
private KafkaSupervisorSpec createKafkaSupervisor(String supervisorId,
String topic)
{
- return new KafkaSupervisorSpec(
- supervisorId,
- null,
- DataSchema.builder()
- .withDataSource(dataSource)
- .withTimestamp(new TimestampSpec("timestamp", null, null))
- .withDimensions(DimensionsSpec.EMPTY)
- .build(),
- createTuningConfig(),
- new KafkaSupervisorIOConfig(
- topic,
- null,
- new CsvInputFormat(List.of("timestamp", "item"), null, null,
false, 0, false),
- null, null,
- null,
- kafkaServer.consumerProperties(),
- null, null, null, null, null,
- true,
- null, null, null, null, null, null, null, null
- ),
- null, null, null, null, null, null, null, null, null, null, null
- );
- }
-
- private KafkaSupervisorTuningConfig createTuningConfig()
- {
- return new KafkaSupervisorTuningConfig(
- null,
- null, null, null,
- 1,
- null, null, null, null, null, null, null, null, null, null,
- null, null, null, null, new Period("PT2S"), null, null, null, null,
null, true
- );
+ return new KafkaSupervisorSpecBuilder()
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec("timestamp", null, null))
+ .withDimensions(DimensionsSpec.EMPTY)
+ )
+ .withTuningConfig(
+ tuningConfig -> tuningConfig
+ .withMaxRowsPerSegment(1)
+ .withReleaseLocksOnHandoff(true)
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withInputFormat(new CsvInputFormat(List.of("timestamp",
"item"), null, null, false, 0, false))
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withUseEarliestSequenceNumber(true)
+ )
+ .withId(supervisorId)
+ .build(dataSource, topic);
}
private List<ProducerRecord<byte[], byte[]>> generateRecordsForTopic(
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
new file mode 100644
index 00000000000..1e0b12eb2a0
--- /dev/null
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaIOConfigBuilder.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.supervisor;
+
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.kafkainput.KafkaInputFormat;
+import
org.apache.druid.indexing.seekablestream.supervisor.SupervisorIOConfigBuilder;
+
+import java.util.Map;
+
+/**
+ * Builder for {@link KafkaSupervisorIOConfig}.
+ */
+public class KafkaIOConfigBuilder extends
SupervisorIOConfigBuilder<KafkaIOConfigBuilder, KafkaSupervisorIOConfig>
+{
+ private String topic;
+ private String topicPattern;
+ private Map<String, Object> consumerProperties;
+
+ public KafkaIOConfigBuilder withTopic(String topic)
+ {
+ this.topic = topic;
+ return this;
+ }
+
+ public KafkaIOConfigBuilder withTopicPattern(String topicPattern)
+ {
+ this.topicPattern = topicPattern;
+ return this;
+ }
+
+ public KafkaIOConfigBuilder withConsumerProperties(Map<String, Object>
consumerProperties)
+ {
+ this.consumerProperties = consumerProperties;
+ return this;
+ }
+
+ public KafkaIOConfigBuilder withKafkaInputFormat(InputFormat valueFormat)
+ {
+ this.inputFormat = new KafkaInputFormat(
+ null,
+ null,
+ valueFormat,
+ null,
+ null,
+ null,
+ null
+ );
+ return this;
+ }
+
+ @Override
+ public KafkaSupervisorIOConfig build()
+ {
+ return new KafkaSupervisorIOConfig(
+ topic,
+ topicPattern,
+ inputFormat,
+ replicas,
+ taskCount,
+ taskDuration,
+ consumerProperties,
+ autoScalerConfig,
+ lagAggregator,
+ null,
+ startDelay,
+ period,
+ useEarliestSequenceNumber,
+ completionTimeout,
+ lateMessageRejectionPeriod,
+ earlyMessageRejectionPeriod,
+ lateMessageRejectionStartDateTime,
+ null,
+ idleConfig,
+ stopTaskCount,
+ null
+ );
+ }
+}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
new file mode 100644
index 00000000000..ce36c0b7dc0
--- /dev/null
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecBuilder.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.supervisor;
+
+import org.apache.druid.segment.indexing.DataSchema;
+
+import java.util.function.Consumer;
+
+/**
+ * Builder for a {@link KafkaSupervisorSpec}, which can be sent over a POST
call
+ * to the Overlord.
+ */
+public class KafkaSupervisorSpecBuilder
+{
+ private String id;
+ private final DataSchema.Builder dataSchema = new DataSchema.Builder();
+ private final KafkaIOConfigBuilder ioConfig = new KafkaIOConfigBuilder();
+ private final KafkaTuningConfigBuilder tuningConfig = new
KafkaTuningConfigBuilder();
+
+ public KafkaSupervisorSpecBuilder
withDataSchema(Consumer<DataSchema.Builder> updateDataSchema)
+ {
+ updateDataSchema.accept(this.dataSchema);
+ return this;
+ }
+
+ public KafkaSupervisorSpecBuilder
withTuningConfig(Consumer<KafkaTuningConfigBuilder> updateTuningConfig)
+ {
+ updateTuningConfig.accept(this.tuningConfig);
+ return this;
+ }
+
+ public KafkaSupervisorSpecBuilder
withIoConfig(Consumer<KafkaIOConfigBuilder> updateIOConfig)
+ {
+ updateIOConfig.accept(this.ioConfig);
+ return this;
+ }
+
+ public KafkaSupervisorSpecBuilder withId(String id)
+ {
+ this.id = id;
+ return this;
+ }
+
+ /**
+ * Builds a new {@link KafkaSupervisorSpec} with the specified parameters.
+ */
+ public KafkaSupervisorSpec build(String dataSource, String topic)
+ {
+ dataSchema.withDataSource(dataSource);
+ ioConfig.withTopic(topic).withTopicPattern(null);
+ return build();
+ }
+
+ /**
+ * Builds a new {@link KafkaSupervisorSpec} which reads from multiple toppics
+ * that satisfy the given {@code topicPattern}.
+ */
+ public KafkaSupervisorSpec buildWithTopicPattern(String dataSource, String
topicPattern)
+ {
+ dataSchema.withDataSource(dataSource);
+ ioConfig.withTopic(null).withTopicPattern(topicPattern);
+ return build();
+ }
+
+ private KafkaSupervisorSpec build()
+ {
+ return new KafkaSupervisorSpec(
+ id,
+ null,
+ dataSchema.build(),
+ tuningConfig.build(),
+ ioConfig.build(),
+ null,
+ false,
+ // Jackson injected params, not needed while posting a supervisor to
the Overlord
+ null, null, null, null, null, null, null, null, null
+ );
+ }
+}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
index d7bb9acfb7f..0e47869018f 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorSpecTest.java
@@ -20,9 +20,8 @@
package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.databind.InjectableValues;
-import com.fasterxml.jackson.databind.Module;
import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.druid.data.input.impl.JsonInputFormat;
+import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.error.DruidException;
import org.apache.druid.error.DruidExceptionMatcher;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
@@ -35,7 +34,6 @@ import
org.apache.druid.indexing.overlord.supervisor.SupervisorStateManagerConfi
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
-import org.apache.druid.java.util.common.parsers.JSONPathSpec;
import org.apache.druid.java.util.emitter.service.ServiceEmitter;
import org.apache.druid.java.util.metrics.DruidMonitorSchedulerConfig;
import org.apache.druid.math.expr.ExprMacroTable;
@@ -43,7 +41,6 @@ import org.apache.druid.metadata.TestSupervisorSpec;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.expression.LookupEnabledTestExprMacroTable;
import org.apache.druid.segment.incremental.RowIngestionMetersFactory;
-import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.server.metrics.NoopServiceEmitter;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
@@ -74,7 +71,7 @@ public class KafkaSupervisorSpecTest
.addValue(SupervisorStateManagerConfig.class, null)
.addValue(ExprMacroTable.class.getName(),
LookupEnabledTestExprMacroTable.INSTANCE)
);
- mapper.registerModules((Iterable<Module>) new
KafkaIndexTaskModule().getJacksonModules());
+ mapper.registerModules(new KafkaIndexTaskModule().getJacksonModules());
}
@Test
@@ -676,95 +673,39 @@ public class KafkaSupervisorSpecTest
);
// Test valid spec update. This spec changes context vs the sourceSpec
- KafkaSupervisorSpec validDestSpec = new KafkaSupervisorSpec(
- null,
- null,
- DataSchema.builder().withDataSource("testDs").withAggregators(new
CountAggregatorFactory("rows")).withGranularity(new
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(),
- null,
- new KafkaSupervisorIOConfig(
- "metrics",
- null,
- new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
- null,
- null,
- null,
- Map.of("bootstrap.servers", "localhost:9092"),
- null,
- null,
- null,
- null,
- null,
- true,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- false
- ),
- Map.of(
- "key1",
- "value1",
- "key2",
- "value2"
- ),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ KafkaSupervisorSpec validDestSpec = new KafkaSupervisorSpecBuilder()
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec(null, null, null))
+ .withAggregators(new CountAggregatorFactory("rows"))
+ .withGranularity(new UniformGranularitySpec(Granularities.DAY,
Granularities.NONE, null))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withConsumerProperties(Map.of("bootstrap.servers",
"localhost:9092"))
+ )
+ .build("testDs", "metrics");
sourceSpec.validateSpecUpdateTo(validDestSpec);
}
private KafkaSupervisorSpec getSpec(String topic, String topicPattern)
{
- return new KafkaSupervisorSpec(
- null,
- null,
- DataSchema.builder().withDataSource("testDs").withAggregators(new
CountAggregatorFactory("rows")).withGranularity(new
UniformGranularitySpec(Granularities.DAY, Granularities.NONE, null)).build(),
- null,
- new KafkaSupervisorIOConfig(
- topic,
- topicPattern,
- new JsonInputFormat(JSONPathSpec.DEFAULT, null, null, null, null),
- null,
- null,
- null,
- Map.of("bootstrap.servers", "localhost:9092"),
- null,
- null,
- null,
- null,
- null,
- true,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- false
- ),
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ KafkaSupervisorSpecBuilder builder = new KafkaSupervisorSpecBuilder()
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec(null, null, null))
+ .withAggregators(new CountAggregatorFactory("rows"))
+ .withGranularity(new UniformGranularitySpec(Granularities.DAY,
Granularities.NONE, null))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withJsonInputFormat()
+ .withConsumerProperties(Map.of("bootstrap.servers",
"localhost:9092"))
+ );
+
+ return topic == null
+ ? builder.buildWithTopicPattern("testDs", topicPattern)
+ : builder.build("testDs", topic);
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
index 9b80d2db41a..3c30cc0e25c 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTest.java
@@ -152,7 +152,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
private static final String TOPIC_PREFIX = "testTopic";
private static final String DATASOURCE = "testDS";
private static final int NUM_PARTITIONS = 3;
- private static final long TEST_CHAT_RETRIES = 9L;
+ private static final int TEST_CHAT_RETRIES = 9;
private static final Period TEST_HTTP_TIMEOUT = new Period("PT10S");
private static final Period TEST_SHUTDOWN_TIMEOUT = new Period("PT80S");
@@ -308,59 +308,27 @@ public class KafkaSupervisorTest extends EasyMockSupport
consumerProperties.put("myCustomKey", "myCustomValue");
consumerProperties.put("bootstrap.servers", kafkaHost);
- KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new
KafkaSupervisorIOConfig(
- topic,
- null,
- INPUT_FORMAT,
- replicas,
- 1,
- new Period("PT1H"),
- consumerProperties,
- OBJECT_MAPPER.convertValue(autoScalerConfig,
LagBasedAutoScalerConfig.class),
- null,
- KafkaSupervisorIOConfig.DEFAULT_POLL_TIMEOUT_MILLIS,
- new Period("P1D"),
- new Period("PT30S"),
- true,
- new Period("PT30M"),
- null,
- null,
- null,
- null,
- new IdleConfig(true, 1000L),
- 1,
- false
- );
-
- final KafkaSupervisorTuningConfig tuningConfigOri = new
KafkaSupervisorTuningConfig(
- null,
- 1000,
- null,
- null,
- 50000,
- null,
- new Period("P1Y"),
- null,
- null,
- null,
- false,
- null,
- false,
- null,
- numThreads,
- TEST_CHAT_RETRIES,
- TEST_HTTP_TIMEOUT,
- TEST_SHUTDOWN_TIMEOUT,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
-
+ KafkaSupervisorIOConfig kafkaSupervisorIOConfig = new
KafkaIOConfigBuilder()
+ .withTopic(topic)
+ .withInputFormat(INPUT_FORMAT)
+ .withReplicas(replicas)
+ .withTaskCount(1)
+ .withConsumerProperties(consumerProperties)
+ .withAutoScalerConfig(OBJECT_MAPPER.convertValue(autoScalerConfig,
LagBasedAutoScalerConfig.class))
+ .withUseEarliestSequenceNumber(true)
+ .build();
+
+ final KafkaSupervisorTuningConfig tuningConfigOri = new
KafkaTuningConfigBuilder()
+ .withIntermediatePersistPeriod(Period.years(1))
+ .withResetOffsetAutomatically(false)
+ .withWorkerThreads(numThreads)
+ .withShutdownTimeout(TEST_SHUTDOWN_TIMEOUT)
+ .withMaxRowsInMemory(1000)
+ .withMaxRowsPerSegment(50000)
+ .withReportParseExceptions(false)
+ .withChatHandlerNumRetries(TEST_CHAT_RETRIES)
+ .withChatHandlerTimeout(TEST_HTTP_TIMEOUT.toStandardDuration())
+ .build();
EasyMock.expect(ingestionSchema.getIOConfig()).andReturn(kafkaSupervisorIOConfig).anyTimes();
EasyMock.expect(ingestionSchema.getDataSchema()).andReturn(dataSchema).anyTimes();
EasyMock.expect(ingestionSchema.getTuningConfig()).andReturn(tuningConfigOri).anyTimes();
@@ -4649,34 +4617,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
false,
kafkaHost,
dataSchema,
- new KafkaSupervisorTuningConfig(
- null,
- 1000,
- null,
- null,
- 50000,
- null,
- new Period("P1Y"),
- null,
- null,
- null,
- false,
- null,
- false,
- null,
- numThreads,
- TEST_CHAT_RETRIES,
- TEST_HTTP_TIMEOUT,
- TEST_SHUTDOWN_TIMEOUT,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- )
+ tuningConfigBuilder().build()
);
supervisor.addTaskGroupToActivelyReadingTaskGroup(
@@ -4690,34 +4631,9 @@ public class KafkaSupervisorTest extends EasyMockSupport
DataSchema modifiedDataSchema = getDataSchema("some other datasource");
- KafkaSupervisorTuningConfig modifiedTuningConfig = new
KafkaSupervisorTuningConfig(
- null,
- 42, // This is different
- null,
- null,
- 50000,
- null,
- new Period("P1Y"),
- null,
- null,
- null,
- false,
- null,
- null,
- null,
- numThreads,
- TEST_CHAT_RETRIES,
- TEST_HTTP_TIMEOUT,
- TEST_SHUTDOWN_TIMEOUT,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ KafkaSupervisorTuningConfig modifiedTuningConfig = tuningConfigBuilder()
+ .withMaxRowsInMemory(42)
+ .build();
KafkaIndexTask completedTaskFromStorage = createKafkaIndexTask(
"id0",
@@ -4845,34 +4761,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
false,
kafkaHost,
dataSchema,
- new KafkaSupervisorTuningConfig(
- null,
- 1000,
- null,
- null,
- 50000,
- null,
- new Period("P1Y"),
- null,
- null,
- null,
- false,
- null,
- false,
- null,
- numThreads,
- TEST_CHAT_RETRIES,
- TEST_HTTP_TIMEOUT,
- TEST_SHUTDOWN_TIMEOUT,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- )
+ tuningConfigBuilder().build()
);
// Create task1 with some start and end offsets
@@ -5208,6 +5097,19 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
}
+ private KafkaTuningConfigBuilder tuningConfigBuilder()
+ {
+ return new KafkaTuningConfigBuilder()
+ .withIntermediatePersistPeriod(Period.years(1))
+ .withResetOffsetAutomatically(false)
+ .withWorkerThreads(numThreads)
+ .withShutdownTimeout(TEST_SHUTDOWN_TIMEOUT)
+ .withMaxRowsInMemory(1000)
+ .withMaxRowsPerSegment(50000)
+ .withReportParseExceptions(false)
+ .withChatHandlerNumRetries(TEST_CHAT_RETRIES)
+ .withChatHandlerTimeout(TEST_HTTP_TIMEOUT.toStandardDuration());
+ }
private TestableKafkaSupervisor getTestableSupervisor(
int replicas,
@@ -5366,34 +5268,10 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
};
- final KafkaSupervisorTuningConfig tuningConfig = new
KafkaSupervisorTuningConfig(
- null,
- 1000,
- null,
- null,
- 50000,
- null,
- new Period("P1Y"),
- null,
- null,
- null,
- false,
- null,
- resetOffsetAutomatically,
- null,
- numThreads,
- TEST_CHAT_RETRIES,
- TEST_HTTP_TIMEOUT,
- TEST_SHUTDOWN_TIMEOUT,
- null,
- null,
- null,
- null,
- 10,
- null,
- null,
- null
- );
+ final KafkaSupervisorTuningConfig tuningConfig = tuningConfigBuilder()
+ .withMaxSavedParseExceptions(10)
+ .withResetOffsetAutomatically(resetOffsetAutomatically)
+ .build();
return new TestableKafkaSupervisor(
taskStorage,
@@ -5484,34 +5362,7 @@ public class KafkaSupervisorTest extends EasyMockSupport
}
};
- final KafkaSupervisorTuningConfig tuningConfig = new
KafkaSupervisorTuningConfig(
- null,
- 1000,
- null,
- null,
- 50000,
- null,
- new Period("P1Y"),
- null,
- null,
- null,
- false,
- null,
- false,
- null,
- numThreads,
- TEST_CHAT_RETRIES,
- TEST_HTTP_TIMEOUT,
- TEST_SHUTDOWN_TIMEOUT,
- null,
- null,
- null,
- null,
- null,
- null,
- null,
- null
- );
+ final KafkaSupervisorTuningConfig tuningConfig =
tuningConfigBuilder().build();
return new TestableKafkaSupervisorWithCustomIsTaskCurrent(
taskStorage,
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
index cda5904edda..376ca256303 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfigTest.java
@@ -64,7 +64,7 @@ public class KafkaSupervisorTuningConfigTest
Assert.assertEquals(0, config.getMaxPendingPersists());
Assert.assertEquals(IndexSpec.DEFAULT, config.getIndexSpec());
Assert.assertEquals(IndexSpec.DEFAULT,
config.getIndexSpecForIntermediatePersists());
- Assert.assertEquals(false, config.isReportParseExceptions());
+ Assert.assertFalse(config.isReportParseExceptions());
Assert.assertEquals(java.time.Duration.ofMinutes(15).toMillis(),
config.getHandoffConditionTimeout());
Assert.assertNull(config.getWorkerThreads());
Assert.assertEquals(8L, (long) config.getChatRetries());
@@ -111,7 +111,7 @@ public class KafkaSupervisorTuningConfigTest
Assert.assertEquals(100, config.getMaxRowsPerSegment().intValue());
Assert.assertEquals(new Period("PT1H"),
config.getIntermediatePersistPeriod());
Assert.assertEquals(100, config.getMaxPendingPersists());
- Assert.assertEquals(true, config.isReportParseExceptions());
+ Assert.assertTrue(config.isReportParseExceptions());
Assert.assertEquals(100, config.getHandoffConditionTimeout());
Assert.assertEquals(12, (int) config.getWorkerThreads());
Assert.assertEquals(14L, (long) config.getChatRetries());
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java
new file mode 100644
index 00000000000..44113519c9c
--- /dev/null
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.supervisor;
+
+import org.apache.druid.indexing.common.task.TuningConfigBuilder;
+import org.joda.time.Period;
+
+/**
+ * Builder for {@link KafkaSupervisorTuningConfig} used in tests.
+ */
+public class KafkaTuningConfigBuilder extends
TuningConfigBuilder<KafkaTuningConfigBuilder, KafkaSupervisorTuningConfig>
+{
+ private Period intermediatePersistPeriod;
+ private Long handoffConditionTimeout;
+ private Boolean resetOffsetAutomatically;
+ private Integer workerThreads;
+ private Period shutdownTimeout;
+ private Period offsetFetchPeriod;
+ private Period intermediateHandoffPeriod;
+ private Boolean releaseLocksOnHandoff;
+
+ public KafkaTuningConfigBuilder withIntermediatePersistPeriod(Period
intermediatePersistPeriod)
+ {
+ this.intermediatePersistPeriod = intermediatePersistPeriod;
+ return this;
+ }
+
+ public KafkaTuningConfigBuilder withHandoffConditionTimeout(Long
handoffConditionTimeout)
+ {
+ this.handoffConditionTimeout = handoffConditionTimeout;
+ return this;
+ }
+
+ public KafkaTuningConfigBuilder withResetOffsetAutomatically(Boolean
resetOffsetAutomatically)
+ {
+ this.resetOffsetAutomatically = resetOffsetAutomatically;
+ return this;
+ }
+
+ public KafkaTuningConfigBuilder withWorkerThreads(Integer workerThreads)
+ {
+ this.workerThreads = workerThreads;
+ return this;
+ }
+
+ public KafkaTuningConfigBuilder withShutdownTimeout(Period shutdownTimeout)
+ {
+ this.shutdownTimeout = shutdownTimeout;
+ return this;
+ }
+
+ public KafkaTuningConfigBuilder withOffsetFetchPeriod(Period
offsetFetchPeriod)
+ {
+ this.offsetFetchPeriod = offsetFetchPeriod;
+ return this;
+ }
+
+ public KafkaTuningConfigBuilder withIntermediateHandoffPeriod(Period
intermediateHandoffPeriod)
+ {
+ this.intermediateHandoffPeriod = intermediateHandoffPeriod;
+ return this;
+ }
+
+ public KafkaTuningConfigBuilder withReleaseLocksOnHandoff(boolean
releaseLocksOnHandoff)
+ {
+ this.releaseLocksOnHandoff = releaseLocksOnHandoff;
+ return this;
+ }
+
+ @Override
+ public KafkaSupervisorTuningConfig build()
+ {
+ return new KafkaSupervisorTuningConfig(
+ appendableIndexSpec,
+ maxRowsInMemory,
+ maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
+ maxRowsPerSegment,
+ maxTotalRows,
+ intermediatePersistPeriod,
+ maxPendingPersists,
+ indexSpec,
+ indexSpecForIntermediatePersists,
+ reportParseExceptions,
+ handoffConditionTimeout,
+ resetOffsetAutomatically,
+ segmentWriteOutMediumFactory,
+ workerThreads,
+ chatHandlerNumRetries == null ? null : (long) chatHandlerNumRetries,
+ chatHandlerTimeout == null ? null : new
Period(chatHandlerTimeout.getMillis()),
+ shutdownTimeout,
+ offsetFetchPeriod,
+ intermediateHandoffPeriod,
+ logParseExceptions,
+ maxParseExceptions,
+ maxSavedParseExceptions,
+ numPersistThreads,
+ maxColumnsToMerge,
+ releaseLocksOnHandoff
+ );
+ }
+}
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index f0da679d4a4..39a2396a0fb 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -267,6 +267,12 @@
<artifactId>jdbi</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.google.api.grpc</groupId>
+ <artifactId>proto-google-common-protos</artifactId>
+ <version>2.48.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java
index f6e0e29f7d9..15f0d62ac45 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TaskBuilder.java
@@ -61,13 +61,19 @@ import java.util.stream.Stream;
* The builder does not use any defaults and all required fields must be set
* explicitly.
*
- * @param <T> Type of task created by this builder.
+ * @param <Self> Type of this builder itself
* @param <C> Type of tuning config used by this builder.
+ * @param <T> Type of task created by this builder.
+ * @param <CB> Type of tuning config builder
* @see #ofTypeIndex()
* @see #tuningConfig(Consumer) to specify the {@code tuningConfig}.
*/
@SuppressWarnings("unchecked")
-public abstract class TaskBuilder<B extends TaskBuilder<B, C, T>, C, T extends
Task>
+public abstract class TaskBuilder<
+ T extends Task,
+ C,
+ CB extends TuningConfigBuilder<CB, C>,
+ Self extends TaskBuilder<T, C, CB, Self>>
{
// Fields are package-protected to allow access by subclasses like Index,
Compact
InputSource inputSource = null;
@@ -77,7 +83,7 @@ public abstract class TaskBuilder<B extends TaskBuilder<B, C,
T>, C, T extends T
Boolean appendToExisting = null;
- final TuningConfigBuilder<C> tuningConfig;
+ final TuningConfigBuilder<CB, C> tuningConfig;
private TaskBuilder()
{
@@ -90,9 +96,9 @@ public abstract class TaskBuilder<B extends TaskBuilder<B, C,
T>, C, T extends T
*/
public abstract T withId(String taskId);
- public abstract B dataSource(String dataSource);
+ public abstract Self dataSource(String dataSource);
- protected abstract TuningConfigBuilder<C> tuningConfigBuilder();
+ protected abstract TuningConfigBuilder<CB, C> tuningConfigBuilder();
/**
* Initializes builder for a new {@link IndexTask}.
@@ -115,18 +121,18 @@ public abstract class TaskBuilder<B extends
TaskBuilder<B, C, T>, C, T extends T
return new Compact();
}
- public B inputSource(InputSource inputSource)
+ public Self inputSource(InputSource inputSource)
{
this.inputSource = inputSource;
- return (B) this;
+ return (Self) this;
}
- public B inlineInputSourceWithData(String data)
+ public Self inlineInputSourceWithData(String data)
{
return inputSource(new InlineInputSource(data));
}
- public B druidInputSource(String dataSource, Interval interval)
+ public Self druidInputSource(String dataSource, Interval interval)
{
return inputSource(
new DruidInputSource(
@@ -153,94 +159,98 @@ public abstract class TaskBuilder<B extends
TaskBuilder<B, C, T>, C, T extends T
* }
* </pre>
*/
- public B localInputSourceWithFiles(File... files)
+ public Self localInputSourceWithFiles(File... files)
{
return inputSource(
new LocalInputSource(null, null, List.of(files), null)
);
}
- public B inputFormat(InputFormat inputFormat)
+ public Self inputFormat(InputFormat inputFormat)
{
this.inputFormat = inputFormat;
- return (B) this;
+ return (Self) this;
}
- public B jsonInputFormat()
+ public Self jsonInputFormat()
{
return inputFormat(
new JsonInputFormat(null, null, null, null, null)
);
}
- public B csvInputFormatWithColumns(String... columns)
+ public Self csvInputFormatWithColumns(String... columns)
{
return inputFormat(
new CsvInputFormat(List.of(columns), null, null, false, 0, null)
);
}
- public B appendToExisting(boolean append)
+ public Self appendToExisting(boolean append)
{
this.appendToExisting = append;
- return (B) this;
+ return (Self) this;
}
- public B dynamicPartitionWithMaxRows(int maxRowsPerSegment)
+ public Self dynamicPartitionWithMaxRows(int maxRowsPerSegment)
{
tuningConfig.withPartitionsSpec(new
DynamicPartitionsSpec(maxRowsPerSegment, null));
- return (B) this;
+ return (Self) this;
}
- public B tuningConfig(Consumer<TuningConfigBuilder<C>> updateTuningConfig)
+ public Self tuningConfig(Consumer<TuningConfigBuilder<CB, C>>
updateTuningConfig)
{
updateTuningConfig.accept(tuningConfig);
- return (B) this;
+ return (Self) this;
}
- public B context(String key, Object value)
+ public Self context(String key, Object value)
{
this.context.put(key, value);
- return (B) this;
+ return (Self) this;
}
- public abstract static class IndexCommon<B extends TaskBuilder<B, C, T>, C,
T extends Task>
- extends TaskBuilder<B, C, T>
+ public abstract static class IndexCommon<
+ T extends Task,
+ C,
+ CB extends TuningConfigBuilder<CB, C>,
+ Self extends TaskBuilder<T, C, CB, Self>>
+ extends TaskBuilder<T, C, CB, Self>
{
final DataSchema.Builder dataSchema = DataSchema.builder();
@Override
- public B dataSource(String dataSource)
+ public Self dataSource(String dataSource)
{
dataSchema.withDataSource(dataSource);
- return (B) this;
+ return (Self) this;
}
- public B dataSchema(Consumer<DataSchema.Builder> updateDataSchema)
+ public Self dataSchema(Consumer<DataSchema.Builder> updateDataSchema)
{
updateDataSchema.accept(dataSchema);
- return (B) this;
+ return (Self) this;
}
- public B isoTimestampColumn(String timestampColumn)
+ public Self isoTimestampColumn(String timestampColumn)
{
dataSchema.withTimestamp(new TimestampSpec(timestampColumn, "iso",
null));
- return (B) this;
+ return (Self) this;
}
- public B timestampColumn(String timestampColumn)
+ public Self timestampColumn(String timestampColumn)
{
dataSchema.withTimestamp(new TimestampSpec(timestampColumn, null, null));
- return (B) this;
+ return (Self) this;
}
- public B granularitySpec(GranularitySpec granularitySpec)
+ public Self granularitySpec(GranularitySpec granularitySpec)
{
dataSchema.withGranularity(granularitySpec);
- return (B) this;
+ return (Self) this;
}
- public B granularitySpec(String segmentGranularity, String
queryGranularity, Boolean rollup)
+ public Self granularitySpec(String segmentGranularity, String
queryGranularity, Boolean rollup)
{
dataSchema.withGranularity(
new UniformGranularitySpec(
@@ -250,13 +260,13 @@ public abstract class TaskBuilder<B extends
TaskBuilder<B, C, T>, C, T extends T
null
)
);
- return (B) this;
+ return (Self) this;
}
/**
* Sets {@code "granularitySpec": {"segmentGranularity": <arg>}}.
*/
- public B segmentGranularity(String granularity)
+ public Self segmentGranularity(String granularity)
{
return granularitySpec(granularity, null, null);
}
@@ -266,30 +276,34 @@ public abstract class TaskBuilder<B extends
TaskBuilder<B, C, T>, C, T extends T
*
* @see #dataSchema(Consumer) for more options
*/
- public B dimensions(String... dimensions)
+ public Self dimensions(String... dimensions)
{
dataSchema.withDimensions(
Stream.of(dimensions)
.map(StringDimensionSchema::new)
.collect(Collectors.toList())
);
- return (B) this;
+ return (Self) this;
}
- public B metricAggregates(AggregatorFactory... aggregators)
+ public Self metricAggregates(AggregatorFactory... aggregators)
{
dataSchema.withAggregators(aggregators);
- return (B) this;
+ return (Self) this;
}
}
/**
* Builder for {@link IndexTask} that uses a {@link
IndexTask.IndexTuningConfig}.
*/
- public static class Index extends IndexCommon<Index,
IndexTask.IndexTuningConfig, IndexTask>
+ public static class Index extends IndexCommon<
+ IndexTask,
+ IndexTask.IndexTuningConfig,
+ TuningConfigBuilder.Index,
+ Index>
{
@Override
- public TuningConfigBuilder<IndexTask.IndexTuningConfig>
tuningConfigBuilder()
+ public TuningConfigBuilder.Index tuningConfigBuilder()
{
return TuningConfigBuilder.forIndexTask();
}
@@ -320,7 +334,11 @@ public abstract class TaskBuilder<B extends TaskBuilder<B,
C, T>, C, T extends T
/**
* Builder for {@link ParallelIndexSupervisorTask} which uses a {@link
ParallelIndexTuningConfig}.
*/
- public static class IndexParallel extends IndexCommon<IndexParallel,
ParallelIndexTuningConfig, ParallelIndexSupervisorTask>
+ public static class IndexParallel extends IndexCommon<
+ ParallelIndexSupervisorTask,
+ ParallelIndexTuningConfig,
+ TuningConfigBuilder.ParallelIndex,
+ IndexParallel>
{
@Override
public ParallelIndexSupervisorTask withId(String taskId)
@@ -345,7 +363,7 @@ public abstract class TaskBuilder<B extends TaskBuilder<B,
C, T>, C, T extends T
}
@Override
- public TuningConfigBuilder<ParallelIndexTuningConfig> tuningConfigBuilder()
+ public TuningConfigBuilder.ParallelIndex tuningConfigBuilder()
{
return TuningConfigBuilder.forParallelIndexTask();
}
@@ -354,7 +372,11 @@ public abstract class TaskBuilder<B extends TaskBuilder<B,
C, T>, C, T extends T
/**
* Builder for a {@link CompactionTask} which uses a {@link
CompactionTask.CompactionTuningConfig}.
*/
- public static class Compact extends TaskBuilder<Compact,
CompactionTask.CompactionTuningConfig, CompactionTask>
+ public static class Compact extends TaskBuilder<
+ CompactionTask,
+ CompactionTask.CompactionTuningConfig,
+ TuningConfigBuilder.Compact,
+ Compact>
{
private String dataSource;
private Interval interval;
@@ -429,7 +451,7 @@ public abstract class TaskBuilder<B extends TaskBuilder<B,
C, T>, C, T extends T
}
@Override
- public TuningConfigBuilder<CompactionTask.CompactionTuningConfig>
tuningConfigBuilder()
+ public TuningConfigBuilder.Compact tuningConfigBuilder()
{
return TuningConfigBuilder.forCompactionTask();
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java
index c46c0becc76..abac64de8be 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/TuningConfigBuilder.java
@@ -31,11 +31,12 @@ import org.joda.time.Duration;
* Builder utility for various task tuning configs.
*
* @param <C> Type of config that is being built
+ * @param <Self> Type of this builder itself
* @see TuningConfigBuilder#forIndexTask()
* @see TuningConfigBuilder#forParallelIndexTask()
* @see TuningConfigBuilder#forCompactionTask()
*/
-public abstract class TuningConfigBuilder<C>
+public abstract class TuningConfigBuilder<Self extends
TuningConfigBuilder<Self, C>, C>
{
protected Integer targetPartitionSize;
protected Integer maxRowsPerSegment;
@@ -71,202 +72,208 @@ public abstract class TuningConfigBuilder<C>
protected Integer maxAllowedLockCount;
protected Integer numPersistThreads;
- public TuningConfigBuilder<C> withTargetPartitionSize(Integer
targetPartitionSize)
+ public Self withTargetPartitionSize(Integer targetPartitionSize)
{
this.targetPartitionSize = targetPartitionSize;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxRowsPerSegment(Integer
maxRowsPerSegment)
+ public Self withMaxRowsPerSegment(Integer maxRowsPerSegment)
{
this.maxRowsPerSegment = maxRowsPerSegment;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withAppendableIndexSpec(AppendableIndexSpec
appendableIndexSpec)
+ public Self withAppendableIndexSpec(AppendableIndexSpec appendableIndexSpec)
{
this.appendableIndexSpec = appendableIndexSpec;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxRowsInMemory(Integer maxRowsInMemory)
+ public Self withMaxRowsInMemory(Integer maxRowsInMemory)
{
this.maxRowsInMemory = maxRowsInMemory;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxBytesInMemory(Long maxBytesInMemory)
+ public Self withMaxBytesInMemory(Long maxBytesInMemory)
{
this.maxBytesInMemory = maxBytesInMemory;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withSkipBytesInMemoryOverheadCheck(Boolean
skipBytesInMemoryOverheadCheck)
+ public Self withSkipBytesInMemoryOverheadCheck(Boolean
skipBytesInMemoryOverheadCheck)
{
this.skipBytesInMemoryOverheadCheck = skipBytesInMemoryOverheadCheck;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxTotalRows(Long maxTotalRows)
+ public Self withMaxTotalRows(Long maxTotalRows)
{
this.maxTotalRows = maxTotalRows;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withNumShards(Integer numShards)
+ public Self withNumShards(Integer numShards)
{
this.numShards = numShards;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withSplitHintSpec(SplitHintSpec splitHintSpec)
+ public Self withSplitHintSpec(SplitHintSpec splitHintSpec)
{
this.splitHintSpec = splitHintSpec;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withPartitionsSpec(PartitionsSpec
partitionsSpec)
+ public Self withPartitionsSpec(PartitionsSpec partitionsSpec)
{
this.partitionsSpec = partitionsSpec;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withIndexSpec(IndexSpec indexSpec)
+ public Self withIndexSpec(IndexSpec indexSpec)
{
this.indexSpec = indexSpec;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withIndexSpecForIntermediatePersists(IndexSpec
indexSpecForIntermediatePersists)
+ public Self withIndexSpecForIntermediatePersists(IndexSpec
indexSpecForIntermediatePersists)
{
this.indexSpecForIntermediatePersists = indexSpecForIntermediatePersists;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxPendingPersists(Integer
maxPendingPersists)
+ public Self withMaxPendingPersists(Integer maxPendingPersists)
{
this.maxPendingPersists = maxPendingPersists;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withForceGuaranteedRollup(Boolean
forceGuaranteedRollup)
+ public Self withForceGuaranteedRollup(Boolean forceGuaranteedRollup)
{
this.forceGuaranteedRollup = forceGuaranteedRollup;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withReportParseExceptions(Boolean
reportParseExceptions)
+ public Self withReportParseExceptions(Boolean reportParseExceptions)
{
this.reportParseExceptions = reportParseExceptions;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withPushTimeout(Long pushTimeout)
+ public Self withPushTimeout(Long pushTimeout)
{
this.pushTimeout = pushTimeout;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withPublishTimeout(Long publishTimeout)
+ public Self withPublishTimeout(Long publishTimeout)
{
this.publishTimeout = publishTimeout;
- return this;
+ return self();
}
- public TuningConfigBuilder<C>
withSegmentWriteOutMediumFactory(SegmentWriteOutMediumFactory
segmentWriteOutMediumFactory)
+ public Self withSegmentWriteOutMediumFactory(SegmentWriteOutMediumFactory
segmentWriteOutMediumFactory)
{
this.segmentWriteOutMediumFactory = segmentWriteOutMediumFactory;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxNumSubTasks(Integer maxNumSubTasks)
+ public Self withMaxNumSubTasks(Integer maxNumSubTasks)
{
this.maxNumSubTasks = maxNumSubTasks;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxNumConcurrentSubTasks(Integer
maxNumConcurrentSubTasks)
+ public Self withMaxNumConcurrentSubTasks(Integer maxNumConcurrentSubTasks)
{
this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxRetry(Integer maxRetry)
+ public Self withMaxRetry(Integer maxRetry)
{
this.maxRetry = maxRetry;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withTaskStatusCheckPeriodMs(Long
taskStatusCheckPeriodMs)
+ public Self withTaskStatusCheckPeriodMs(Long taskStatusCheckPeriodMs)
{
this.taskStatusCheckPeriodMs = taskStatusCheckPeriodMs;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withChatHandlerTimeout(Duration
chatHandlerTimeout)
+ public Self withChatHandlerTimeout(Duration chatHandlerTimeout)
{
this.chatHandlerTimeout = chatHandlerTimeout;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withChatHandlerNumRetries(Integer
chatHandlerNumRetries)
+ public Self withChatHandlerNumRetries(Integer chatHandlerNumRetries)
{
this.chatHandlerNumRetries = chatHandlerNumRetries;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxNumSegmentsToMerge(Integer
maxNumSegmentsToMerge)
+ public Self withMaxNumSegmentsToMerge(Integer maxNumSegmentsToMerge)
{
this.maxNumSegmentsToMerge = maxNumSegmentsToMerge;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withTotalNumMergeTasks(Integer
totalNumMergeTasks)
+ public Self withTotalNumMergeTasks(Integer totalNumMergeTasks)
{
this.totalNumMergeTasks = totalNumMergeTasks;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withLogParseExceptions(Boolean
logParseExceptions)
+ public Self withLogParseExceptions(Boolean logParseExceptions)
{
this.logParseExceptions = logParseExceptions;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxParseExceptions(Integer
maxParseExceptions)
+ public Self withMaxParseExceptions(Integer maxParseExceptions)
{
this.maxParseExceptions = maxParseExceptions;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxSavedParseExceptions(Integer
maxSavedParseExceptions)
+ public Self withMaxSavedParseExceptions(Integer maxSavedParseExceptions)
{
this.maxSavedParseExceptions = maxSavedParseExceptions;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxColumnsToMerge(Integer
maxColumnsToMerge)
+ public Self withMaxColumnsToMerge(Integer maxColumnsToMerge)
{
this.maxColumnsToMerge = maxColumnsToMerge;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withAwaitSegmentAvailabilityTimeoutMillis(Long
awaitSegmentAvailabilityTimeoutMillis)
+ public Self withAwaitSegmentAvailabilityTimeoutMillis(Long
awaitSegmentAvailabilityTimeoutMillis)
{
this.awaitSegmentAvailabilityTimeoutMillis =
awaitSegmentAvailabilityTimeoutMillis;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withNumPersistThreads(Integer
numPersistThreads)
+ public Self withNumPersistThreads(Integer numPersistThreads)
{
this.numPersistThreads = numPersistThreads;
- return this;
+ return self();
}
- public TuningConfigBuilder<C> withMaxAllowedLockCount(Integer
maxAllowedLockCount)
+ public Self withMaxAllowedLockCount(Integer maxAllowedLockCount)
{
this.maxAllowedLockCount = maxAllowedLockCount;
- return this;
+ return self();
+ }
+
+ @SuppressWarnings("unchecked")
+ private Self self()
+ {
+ return (Self) this;
}
public abstract C build();
@@ -295,7 +302,7 @@ public abstract class TuningConfigBuilder<C>
return new Index();
}
- public static class Index extends
TuningConfigBuilder<IndexTask.IndexTuningConfig>
+ public static class Index extends TuningConfigBuilder<Index,
IndexTask.IndexTuningConfig>
{
@Override
public IndexTask.IndexTuningConfig build()
@@ -330,7 +337,7 @@ public abstract class TuningConfigBuilder<C>
}
}
- public static class ParallelIndex extends
TuningConfigBuilder<ParallelIndexTuningConfig>
+ public static class ParallelIndex extends TuningConfigBuilder<ParallelIndex,
ParallelIndexTuningConfig>
{
@Override
public ParallelIndexTuningConfig build()
@@ -372,7 +379,7 @@ public abstract class TuningConfigBuilder<C>
}
}
- public static class Compact extends
TuningConfigBuilder<CompactionTask.CompactionTuningConfig>
+ public static class Compact extends TuningConfigBuilder<Compact,
CompactionTask.CompactionTuningConfig>
{
@Override
public CompactionTask.CompactionTuningConfig build()
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SupervisorIOConfigBuilder.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SupervisorIOConfigBuilder.java
new file mode 100644
index 00000000000..bcfb33e9d12
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/supervisor/SupervisorIOConfigBuilder.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream.supervisor;
+
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.impl.JsonInputFormat;
+import
org.apache.druid.indexing.seekablestream.supervisor.autoscaler.AutoScalerConfig;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+
+/**
+ * Builder for a {@link SeekableStreamSupervisorIOConfig}.
+ *
+ * @param <Self> Type of this builder itself
+ * @param <C> Type of the IOConfig created by this builder
+ */
+public abstract class SupervisorIOConfigBuilder<
+ Self extends SupervisorIOConfigBuilder<Self, C>,
+ C extends SeekableStreamSupervisorIOConfig>
+{
+ protected String stream;
+ protected InputFormat inputFormat;
+ protected Integer replicas;
+ protected Integer taskCount;
+ protected Period taskDuration;
+ protected Period startDelay;
+ protected Period period;
+ protected Boolean useEarliestSequenceNumber;
+ protected Period completionTimeout;
+ protected Period lateMessageRejectionPeriod;
+ protected Period earlyMessageRejectionPeriod;
+ protected AutoScalerConfig autoScalerConfig;
+ protected LagAggregator lagAggregator;
+ protected DateTime lateMessageRejectionStartDateTime;
+ protected IdleConfig idleConfig;
+ protected Integer stopTaskCount;
+
+ public Self withStream(String stream)
+ {
+ this.stream = stream;
+ return self();
+ }
+
+ public Self withInputFormat(InputFormat inputFormat)
+ {
+ this.inputFormat = inputFormat;
+ return self();
+ }
+
+ public Self withJsonInputFormat()
+ {
+ this.inputFormat = new JsonInputFormat(null, null, null, null, null);
+ return self();
+ }
+
+ public Self withReplicas(Integer replicas)
+ {
+ this.replicas = replicas;
+ return self();
+ }
+
+ public Self withTaskCount(Integer taskCount)
+ {
+ this.taskCount = taskCount;
+ return self();
+ }
+
+ public Self withTaskDuration(Period taskDuration)
+ {
+ this.taskDuration = taskDuration;
+ return self();
+ }
+
+ public Self withStartDelay(Period startDelay)
+ {
+ this.startDelay = startDelay;
+ return self();
+ }
+
+ public Self withSupervisorRunPeriod(Period period)
+ {
+ this.period = period;
+ return self();
+ }
+
+ public Self withUseEarliestSequenceNumber(Boolean useEarliestSequenceNumber)
+ {
+ this.useEarliestSequenceNumber = useEarliestSequenceNumber;
+ return self();
+ }
+
+ public Self withCompletionTimeout(Period completionTimeout)
+ {
+ this.completionTimeout = completionTimeout;
+ return self();
+ }
+
+ public Self withLateMessageRejectionPeriod(Period lateMessageRejectionPeriod)
+ {
+ this.lateMessageRejectionPeriod = lateMessageRejectionPeriod;
+ return self();
+ }
+
+ public Self withEarlyMessageRejectionPeriod(Period
earlyMessageRejectionPeriod)
+ {
+ this.earlyMessageRejectionPeriod = earlyMessageRejectionPeriod;
+ return self();
+ }
+
+ public Self withAutoScalerConfig(AutoScalerConfig autoScalerConfig)
+ {
+ this.autoScalerConfig = autoScalerConfig;
+ return self();
+ }
+
+ public Self withLagAggregator(LagAggregator lagAggregator)
+ {
+ this.lagAggregator = lagAggregator;
+ return self();
+ }
+
+ public Self withLateMessageRejectionStartDateTime(DateTime
lateMessageRejectionStartDateTime)
+ {
+ this.lateMessageRejectionStartDateTime = lateMessageRejectionStartDateTime;
+ return self();
+ }
+
+ public Self withIdleConfig(IdleConfig idleConfig)
+ {
+ this.idleConfig = idleConfig;
+ return self();
+ }
+
+ public Self withStopTaskCount(Integer stopTaskCount)
+ {
+ this.stopTaskCount = stopTaskCount;
+ return self();
+ }
+
+ public abstract C build();
+
+ @SuppressWarnings("unchecked")
+ private Self self()
+ {
+ return (Self) this;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]