github-code-scanning[bot] commented on code in PR #14805: URL: https://github.com/apache/druid/pull/14805#discussion_r1347762477
########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java: ########## @@ -0,0 +1,855 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.joda.JodaModule; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Binder; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesQueryEngine; +import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.incremental.RowMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.server.security.AuthorizerMapper; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +@SuppressWarnings("unchecked") +@RunWith(Parameterized.class) +public class SeekableStreamIndexTaskRunnerTest extends SeekableStreamIndexTaskTestBase +{ + + private static final String STREAM = "stream"; + + private static final String DATASOURCE = "test_ds"; + + private static final String MESSAGE = "{\"id\": 1, \"age\": 10, \"timestamp\":\"2023-09-01T00:00:00.000\"}"; + + private static final String BASE_PERSIST_DIR = "./tmp"; + + private static RecordSupplier<String, String, ByteEntity> recordSupplier; + + private static ServiceEmitter emitter; + + private static SeekableStreamIndexTaskRunner taskRunner; + + public SeekableStreamIndexTaskRunnerTest(LockGranularity lockGranularity) + { + super(lockGranularity); + } + + @Parameterized.Parameters(name = "{0}") + public static Iterable<Object[]> constructorFeeder() + { + return ImmutableList.of( + new Object[]{LockGranularity.TIME_CHUNK}, + new Object[]{LockGranularity.SEGMENT} + ); + } + + @BeforeClass + public static void setupClass() + { + emitter = new ServiceEmitter( + "service", + "host", + new NoopEmitter() + ); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + + taskExec = MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + Execs.makeThreadFactory("runner-task-test-%d") + ) + ); + } + + @Before + public void setup() throws IOException + { + reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json"); + recordSupplier = new TestRecordSupplier(); + + TestUtils testUtils = new TestUtils(); + final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); + + for (Module module : new TestIndexTaskModule().getJacksonModules()) { + objectMapper.registerModule(module); + } + + makeToolboxFactory(testUtils, emitter, false); + } + + @After + public void tearDownTest() throws IOException + { + synchronized (runningTasks) { + for (Task task : runningTasks) { + task.stopGracefully(toolboxFactory.build(task).getConfig()); + } + + runningTasks.clear(); + } + + reportsFile.delete(); + FileUtils.deleteDirectory(new File(BASE_PERSIST_DIR)); + destroyToolboxFactory(); + } + + @Test + public void testRunTaskWithoutIntermediateHandOff() throws ExecutionException, InterruptedException + { + TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig( + 0, + STREAM, + new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")), + null, + null, + null, + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false) + ); + + TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File(BASE_PERSIST_DIR), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + SeekableStreamIndexTask task = new TestSeekableStreamIndexTask( + "id1", + null, + getDataSchema(), + taskTuningConfig, + taskIoConfig, + null, + "0" + ); + + taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK); + + final ListenableFuture<TaskStatus> future = runTask(task); + Thread.sleep(5 * 1000L); + + Assert.assertEquals(0, countEvents(task)); + Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus()); + + taskRunner.pause(); + taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true); + + Thread.sleep(5 * 1000L); // wait for publishing segment + taskRunner.stopGracefully(); + + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1)); + + publishedDescriptors(); + publishedSegments(); + } + + @Test + public void testRunTaskWithIntermediateHandOff() throws ExecutionException, InterruptedException + { + TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig( + 0, + STREAM, + new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")), + null, + null, + null, + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false) + ); + + TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File(BASE_PERSIST_DIR), + null, + null, + null, + null, + null, + null, + null, + Period.seconds(5), + null, + null, + null + ); + + + SeekableStreamIndexTask task = new TestSeekableStreamIndexTask( + "id1", + null, + getDataSchema(), + taskTuningConfig, + taskIoConfig, + null, + "0" + ); + + taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK); + + final ListenableFuture<TaskStatus> future = runTask(task); + Thread.sleep(10 * 1000L); // > intermediateHandoffPeriod + + Assert.assertEquals(0, countEvents(task)); + Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.PAUSED, task.getRunner().getStatus()); + + // taskRunner.pause(); + taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true); + // taskRunner.possiblyResetDataSourceMetadata(this.toolbox, null, Collections.emptySet()); + + Thread.sleep(5 * 1000L); // wait for publishing segment + taskRunner.stopGracefully(); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1)); + + publishedDescriptors(); + publishedSegments(); + } + + @Override + protected QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate() + { + return new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder() + .put( + TimeseriesQuery.class, + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(), + new TimeseriesQueryEngine(), + (query, future) -> { + // do nothing + } + ) + ).put( + ScanQuery.class, + new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ) + ) + .build() + ); + } + + public static class TestRecordSupplier implements RecordSupplier<String, String, ByteEntity> + { + @Override + public void assign(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public void seek(StreamPartition<String> partition, String sequenceNumber) + { + } + + @Override + public void seekToEarliest(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public void seekToLatest(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public Collection<StreamPartition<String>> getAssignment() + { + return Collections.singletonList(new StreamPartition<>(STREAM, "0")); + } + + @Override + public @NotNull List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long timeout) + { + return Collections.emptyList(); + } + + @Override + public String getLatestSequenceNumber(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public String getEarliestSequenceNumber(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset) + { + return false; + } + + @Override + public String getPosition(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public Set<String> getPartitionIds(String stream) + { + return Sets.newHashSet("0"); + } + + @Override + public void close() + { + } + } + + public static class TestSeekableStreamSupervisorTuningConfig implements SeekableStreamSupervisorTuningConfig + { + @Override + public Integer getWorkerThreads() + { + return 1; + } + + @Override + public Long getChatRetries() + { + return 1L; + } + + @Override + public Duration getHttpTimeout() + { + return new Period("PT1M").toStandardDuration(); + } + + @Override + public Duration getShutdownTimeout() + { + return new Period("PT1S").toStandardDuration(); + } + + @Override + public Duration getRepartitionTransitionDuration() + { + return new Period("PT2M").toStandardDuration(); + } + + @Override + public Duration getOffsetFetchPeriod() + { + return new Period("PT5M").toStandardDuration(); + } + + @Override + public TestSeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() + { + return new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File("./tmp"), + null, + null, + null, + null, + null, + null, + null, + Period.seconds(5), + null, + null, + null + ); + } + } + + public static class TestSeekableStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig + { + @JsonCreator + public TestSeekableStreamIndexTaskTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, + @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, + @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, + @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, + @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, + @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, + @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, + @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + ) + { + super( + appendableIndexSpec, + maxRowsInMemory, + maxBytesInMemory, + skipBytesInMemoryOverheadCheck, + maxRowsPerSegment, + maxTotalRows, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + indexSpecForIntermediatePersists, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + false, + segmentWriteOutMediumFactory, + intermediateHandoffPeriod, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + } + + @Override + public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir) + { + return new TestSeekableStreamIndexTaskTuningConfig( + getAppendableIndexSpec(), + getMaxRowsInMemory(), + getMaxBytesInMemory(), + isSkipBytesInMemoryOverheadCheck(), + getMaxRowsPerSegment(), + getMaxTotalRows(), + getIntermediatePersistPeriod(), + dir, + getMaxPendingPersists(), Review Comment: ## Deprecated method or constructor invocation Invoking [SeekableStreamIndexTaskTuningConfig.getMaxPendingPersists](1) should be avoided because it has been deprecated. [Show more details](https://github.com/apache/druid/security/code-scanning/5871) ########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java: ########## @@ -0,0 +1,855 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.joda.JodaModule; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Binder; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesQueryEngine; +import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.incremental.RowMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.server.security.AuthorizerMapper; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +@SuppressWarnings("unchecked") +@RunWith(Parameterized.class) +public class SeekableStreamIndexTaskRunnerTest extends SeekableStreamIndexTaskTestBase +{ + + private static final String STREAM = "stream"; + + private static final String DATASOURCE = "test_ds"; + + private static final String MESSAGE = "{\"id\": 1, \"age\": 10, \"timestamp\":\"2023-09-01T00:00:00.000\"}"; + + private static final String BASE_PERSIST_DIR = "./tmp"; + + private static RecordSupplier<String, String, ByteEntity> recordSupplier; + + private static ServiceEmitter emitter; + + private static SeekableStreamIndexTaskRunner taskRunner; + + public SeekableStreamIndexTaskRunnerTest(LockGranularity lockGranularity) + { + super(lockGranularity); + } + + @Parameterized.Parameters(name = "{0}") + public static Iterable<Object[]> constructorFeeder() + { + return ImmutableList.of( + new Object[]{LockGranularity.TIME_CHUNK}, + new Object[]{LockGranularity.SEGMENT} + ); + } + + @BeforeClass + public static void setupClass() + { + emitter = new ServiceEmitter( + "service", + "host", + new NoopEmitter() + ); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + + taskExec = MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + Execs.makeThreadFactory("runner-task-test-%d") + ) + ); + } + + @Before + public void setup() throws IOException + { + reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json"); Review Comment: ## Local information disclosure in a temporary directory Local information disclosure vulnerability due to use of file readable by other local users. [Show more details](https://github.com/apache/druid/security/code-scanning/5874) ########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java: ########## @@ -0,0 +1,855 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.joda.JodaModule; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Binder; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesQueryEngine; +import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.incremental.RowMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.server.security.AuthorizerMapper; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +@SuppressWarnings("unchecked") +@RunWith(Parameterized.class) +public class SeekableStreamIndexTaskRunnerTest extends SeekableStreamIndexTaskTestBase +{ + + private static final String STREAM = "stream"; + + private static final String DATASOURCE = "test_ds"; + + private static final String MESSAGE = "{\"id\": 1, \"age\": 10, \"timestamp\":\"2023-09-01T00:00:00.000\"}"; + + private static final String BASE_PERSIST_DIR = "./tmp"; + + private static RecordSupplier<String, String, ByteEntity> recordSupplier; + + private static ServiceEmitter emitter; + + private static SeekableStreamIndexTaskRunner taskRunner; + + public SeekableStreamIndexTaskRunnerTest(LockGranularity lockGranularity) + { + super(lockGranularity); + } + + @Parameterized.Parameters(name = "{0}") + public static Iterable<Object[]> constructorFeeder() + { + return ImmutableList.of( + new Object[]{LockGranularity.TIME_CHUNK}, + new Object[]{LockGranularity.SEGMENT} + ); + } + + @BeforeClass + public static void setupClass() + { + emitter = new ServiceEmitter( + "service", + "host", + new NoopEmitter() + ); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + + taskExec = MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + Execs.makeThreadFactory("runner-task-test-%d") + ) + ); + } + + @Before + public void setup() throws IOException + { + reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json"); + recordSupplier = new TestRecordSupplier(); + + TestUtils testUtils = new TestUtils(); + final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); + + for (Module module : new TestIndexTaskModule().getJacksonModules()) { + objectMapper.registerModule(module); + } + + makeToolboxFactory(testUtils, emitter, false); + } + + @After + public void tearDownTest() throws IOException + { + synchronized (runningTasks) { + for (Task task : runningTasks) { + task.stopGracefully(toolboxFactory.build(task).getConfig()); + } + + runningTasks.clear(); + } + + reportsFile.delete(); + FileUtils.deleteDirectory(new File(BASE_PERSIST_DIR)); + destroyToolboxFactory(); + } + + @Test + public void testRunTaskWithoutIntermediateHandOff() throws ExecutionException, InterruptedException + { + TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig( + 0, + STREAM, + new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")), + null, + null, + null, + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false) + ); + + TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File(BASE_PERSIST_DIR), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + SeekableStreamIndexTask task = new TestSeekableStreamIndexTask( + "id1", + null, + getDataSchema(), + taskTuningConfig, + taskIoConfig, + null, + "0" + ); + + taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK); + + final ListenableFuture<TaskStatus> future = runTask(task); + Thread.sleep(5 * 1000L); + + Assert.assertEquals(0, countEvents(task)); + Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus()); + + taskRunner.pause(); + taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true); + + Thread.sleep(5 * 1000L); // wait for publishing segment + taskRunner.stopGracefully(); + + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1)); + + publishedDescriptors(); + publishedSegments(); + } + + @Test + public void testRunTaskWithIntermediateHandOff() throws ExecutionException, InterruptedException + { + TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig( + 0, + STREAM, + new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")), + null, + null, + null, + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false) + ); + + TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File(BASE_PERSIST_DIR), + null, + null, + null, + null, + null, + null, + null, + Period.seconds(5), + null, + null, + null + ); + + + SeekableStreamIndexTask task = new TestSeekableStreamIndexTask( + "id1", + null, + getDataSchema(), + taskTuningConfig, + taskIoConfig, + null, + "0" + ); + + taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK); + + final ListenableFuture<TaskStatus> future = runTask(task); + Thread.sleep(10 * 1000L); // > intermediateHandoffPeriod + + Assert.assertEquals(0, countEvents(task)); + Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.PAUSED, task.getRunner().getStatus()); + + // taskRunner.pause(); + taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true); + // taskRunner.possiblyResetDataSourceMetadata(this.toolbox, null, Collections.emptySet()); + + Thread.sleep(5 * 1000L); // wait for publishing segment + taskRunner.stopGracefully(); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1)); + + publishedDescriptors(); + publishedSegments(); + } + + @Override + protected QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate() + { + return new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder() + .put( + TimeseriesQuery.class, + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(), + new TimeseriesQueryEngine(), + (query, future) -> { + // do nothing + } + ) + ).put( + ScanQuery.class, + new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ) + ) + .build() + ); + } + + public static class TestRecordSupplier implements RecordSupplier<String, String, ByteEntity> + { + @Override + public void assign(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public void seek(StreamPartition<String> partition, String sequenceNumber) + { + } + + @Override + public void seekToEarliest(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public void seekToLatest(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public Collection<StreamPartition<String>> getAssignment() + { + return Collections.singletonList(new StreamPartition<>(STREAM, "0")); + } + + @Override + public @NotNull List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long timeout) + { + return Collections.emptyList(); + } + + @Override + public String getLatestSequenceNumber(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public String getEarliestSequenceNumber(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset) + { + return false; + } + + @Override + public String getPosition(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public Set<String> getPartitionIds(String stream) + { + return Sets.newHashSet("0"); + } + + @Override + public void close() + { + } + } + + public static class TestSeekableStreamSupervisorTuningConfig implements SeekableStreamSupervisorTuningConfig + { + @Override + public Integer getWorkerThreads() + { + return 1; + } + + @Override + public Long getChatRetries() + { + return 1L; + } + + @Override + public Duration getHttpTimeout() + { + return new Period("PT1M").toStandardDuration(); + } + + @Override + public Duration getShutdownTimeout() + { + return new Period("PT1S").toStandardDuration(); + } + + @Override + public Duration getRepartitionTransitionDuration() + { + return new Period("PT2M").toStandardDuration(); + } + + @Override + public Duration getOffsetFetchPeriod() + { + return new Period("PT5M").toStandardDuration(); + } + + @Override + public TestSeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() + { + return new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File("./tmp"), + null, + null, + null, + null, + null, + null, + null, + Period.seconds(5), + null, + null, + null + ); + } + } + + public static class TestSeekableStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig + { + @JsonCreator + public TestSeekableStreamIndexTaskTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, + @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, + @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, + @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, + @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, + @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, + @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, + @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + ) + { + super( + appendableIndexSpec, + maxRowsInMemory, + maxBytesInMemory, + skipBytesInMemoryOverheadCheck, + maxRowsPerSegment, + maxTotalRows, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + indexSpecForIntermediatePersists, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + false, + segmentWriteOutMediumFactory, + intermediateHandoffPeriod, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + } + + @Override + public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir) + { + return new TestSeekableStreamIndexTaskTuningConfig( + getAppendableIndexSpec(), + getMaxRowsInMemory(), + getMaxBytesInMemory(), + isSkipBytesInMemoryOverheadCheck(), + getMaxRowsPerSegment(), + getMaxTotalRows(), + getIntermediatePersistPeriod(), + dir, + getMaxPendingPersists(), + getIndexSpec(), + getIndexSpecForIntermediatePersists(), + isReportParseExceptions(), + getHandoffConditionTimeout(), + isResetOffsetAutomatically(), + getSegmentWriteOutMediumFactory(), + getIntermediateHandoffPeriod(), + isLogParseExceptions(), + getMaxParseExceptions(), + getMaxSavedParseExceptions() + ); + } + + @Override + public String toString() + { + return "TestSeekableStreamIndexTaskTuningConfig{" + + "maxRowsInMemory=" + getMaxRowsInMemory() + + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", maxTotalRows=" + getMaxTotalRows() + + ", maxBytesInMemory=" + getMaxBytesInMemory() + + ", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() + + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + + ", maxPendingPersists=" + getMaxPendingPersists() + Review Comment: ## Deprecated method or constructor invocation Invoking [SeekableStreamIndexTaskTuningConfig.getMaxPendingPersists](1) should be avoided because it has been deprecated. [Show more details](https://github.com/apache/druid/security/code-scanning/5872) ########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java: ########## @@ -0,0 +1,855 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.joda.JodaModule; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Binder; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesQueryEngine; +import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.incremental.RowMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.server.security.AuthorizerMapper; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +@SuppressWarnings("unchecked") +@RunWith(Parameterized.class) +public class SeekableStreamIndexTaskRunnerTest extends SeekableStreamIndexTaskTestBase +{ + + private static final String STREAM = "stream"; + + private static final String DATASOURCE = "test_ds"; + + private static final String MESSAGE = "{\"id\": 1, \"age\": 10, \"timestamp\":\"2023-09-01T00:00:00.000\"}"; + + private static final String BASE_PERSIST_DIR = "./tmp"; + + private static RecordSupplier<String, String, ByteEntity> recordSupplier; + + private static ServiceEmitter emitter; + + private static SeekableStreamIndexTaskRunner taskRunner; + + public SeekableStreamIndexTaskRunnerTest(LockGranularity lockGranularity) + { + super(lockGranularity); + } + + @Parameterized.Parameters(name = "{0}") + public static Iterable<Object[]> constructorFeeder() + { + return ImmutableList.of( + new Object[]{LockGranularity.TIME_CHUNK}, + new Object[]{LockGranularity.SEGMENT} + ); + } + + @BeforeClass + public static void setupClass() + { + emitter = new ServiceEmitter( + "service", + "host", + new NoopEmitter() + ); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + + taskExec = MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + Execs.makeThreadFactory("runner-task-test-%d") + ) + ); + } + + @Before + public void setup() throws IOException + { + reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json"); + recordSupplier = new TestRecordSupplier(); + + TestUtils testUtils = new TestUtils(); + final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); + + for (Module module : new TestIndexTaskModule().getJacksonModules()) { + objectMapper.registerModule(module); + } + + makeToolboxFactory(testUtils, emitter, false); + } + + @After + public void tearDownTest() throws IOException + { + synchronized (runningTasks) { + for (Task task : runningTasks) { + task.stopGracefully(toolboxFactory.build(task).getConfig()); + } + + runningTasks.clear(); + } + + reportsFile.delete(); + FileUtils.deleteDirectory(new File(BASE_PERSIST_DIR)); + destroyToolboxFactory(); + } + + @Test + public void testRunTaskWithoutIntermediateHandOff() throws ExecutionException, InterruptedException + { + TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig( + 0, + STREAM, + new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")), + null, + null, + null, + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false) + ); + + TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File(BASE_PERSIST_DIR), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + SeekableStreamIndexTask task = new TestSeekableStreamIndexTask( + "id1", + null, + getDataSchema(), + taskTuningConfig, + taskIoConfig, + null, + "0" + ); + + taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK); + + final ListenableFuture<TaskStatus> future = runTask(task); + Thread.sleep(5 * 1000L); + + Assert.assertEquals(0, countEvents(task)); + Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus()); + + taskRunner.pause(); + taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true); + + Thread.sleep(5 * 1000L); // wait for publishing segment + taskRunner.stopGracefully(); + + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1)); + + publishedDescriptors(); + publishedSegments(); + } + + @Test + public void testRunTaskWithIntermediateHandOff() throws ExecutionException, InterruptedException + { + TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig( + 0, + STREAM, + new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")), + null, + null, + null, + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false) + ); + + TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File(BASE_PERSIST_DIR), + null, + null, + null, + null, + null, + null, + null, + Period.seconds(5), + null, + null, + null + ); + + + SeekableStreamIndexTask task = new TestSeekableStreamIndexTask( + "id1", + null, + getDataSchema(), + taskTuningConfig, + taskIoConfig, + null, + "0" + ); + + taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK); + + final ListenableFuture<TaskStatus> future = runTask(task); + Thread.sleep(10 * 1000L); // > intermediateHandoffPeriod + + Assert.assertEquals(0, countEvents(task)); + Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.PAUSED, task.getRunner().getStatus()); + + // taskRunner.pause(); + taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true); + // taskRunner.possiblyResetDataSourceMetadata(this.toolbox, null, Collections.emptySet()); + + Thread.sleep(5 * 1000L); // wait for publishing segment + taskRunner.stopGracefully(); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1)); + + publishedDescriptors(); + publishedSegments(); + } + + @Override + protected QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate() + { + return new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder() + .put( + TimeseriesQuery.class, + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(), + new TimeseriesQueryEngine(), + (query, future) -> { + // do nothing + } + ) + ).put( + ScanQuery.class, + new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ) + ) + .build() + ); + } + + public static class TestRecordSupplier implements RecordSupplier<String, String, ByteEntity> + { + @Override + public void assign(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public void seek(StreamPartition<String> partition, String sequenceNumber) + { + } + + @Override + public void seekToEarliest(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public void seekToLatest(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public Collection<StreamPartition<String>> getAssignment() + { + return Collections.singletonList(new StreamPartition<>(STREAM, "0")); + } + + @Override + public @NotNull List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long timeout) + { + return Collections.emptyList(); + } + + @Override + public String getLatestSequenceNumber(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public String getEarliestSequenceNumber(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset) + { + return false; + } + + @Override + public String getPosition(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public Set<String> getPartitionIds(String stream) + { + return Sets.newHashSet("0"); + } + + @Override + public void close() + { + } + } + + public static class TestSeekableStreamSupervisorTuningConfig implements SeekableStreamSupervisorTuningConfig + { + @Override + public Integer getWorkerThreads() + { + return 1; + } + + @Override + public Long getChatRetries() + { + return 1L; + } + + @Override + public Duration getHttpTimeout() + { + return new Period("PT1M").toStandardDuration(); + } + + @Override + public Duration getShutdownTimeout() + { + return new Period("PT1S").toStandardDuration(); + } + + @Override + public Duration getRepartitionTransitionDuration() + { + return new Period("PT2M").toStandardDuration(); + } + + @Override + public Duration getOffsetFetchPeriod() + { + return new Period("PT5M").toStandardDuration(); + } + + @Override + public TestSeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() + { + return new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File("./tmp"), + null, + null, + null, + null, + null, + null, + null, + Period.seconds(5), + null, + null, + null + ); + } + } + + public static class TestSeekableStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig + { + @JsonCreator + public TestSeekableStreamIndexTaskTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, + @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, + @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, + @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, + @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, + @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, + @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, + @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + ) + { + super( + appendableIndexSpec, + maxRowsInMemory, + maxBytesInMemory, + skipBytesInMemoryOverheadCheck, + maxRowsPerSegment, + maxTotalRows, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + indexSpecForIntermediatePersists, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + false, + segmentWriteOutMediumFactory, + intermediateHandoffPeriod, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + } + + @Override + public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir) + { + return new TestSeekableStreamIndexTaskTuningConfig( + getAppendableIndexSpec(), + getMaxRowsInMemory(), + getMaxBytesInMemory(), + isSkipBytesInMemoryOverheadCheck(), + getMaxRowsPerSegment(), + getMaxTotalRows(), + getIntermediatePersistPeriod(), + dir, + getMaxPendingPersists(), + getIndexSpec(), + getIndexSpecForIntermediatePersists(), + isReportParseExceptions(), + getHandoffConditionTimeout(), + isResetOffsetAutomatically(), + getSegmentWriteOutMediumFactory(), + getIntermediateHandoffPeriod(), + isLogParseExceptions(), + getMaxParseExceptions(), + getMaxSavedParseExceptions() + ); + } + + @Override + public String toString() + { + return "TestSeekableStreamIndexTaskTuningConfig{" + + "maxRowsInMemory=" + getMaxRowsInMemory() + + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", maxTotalRows=" + getMaxTotalRows() + + ", maxBytesInMemory=" + getMaxBytesInMemory() + + ", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() + + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + + ", maxPendingPersists=" + getMaxPendingPersists() + + ", indexSpec=" + getIndexSpec() + + ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() + + ", reportParseExceptions=" + isReportParseExceptions() + + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + + ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() + + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + + ", logParseExceptions=" + isLogParseExceptions() + + ", maxParseExceptions=" + getMaxParseExceptions() + + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + + '}'; + } + } + + public static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity> + { + @JsonCreator + public TestSeekableStreamIndexTask( + @JsonProperty("id") String id, + @JsonProperty("resource") TaskResource taskResource, + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("tuningConfig") TestSeekableStreamIndexTaskTuningConfig tuningConfig, + @JsonProperty("ioConfig") TestSeekableStreamIndexTaskIOConfig ioConfig, + @JsonProperty("context") Map<String, Object> context, + @JsonProperty("groupId") String groupId + ) + { + super(id, + taskResource, + dataSchema, + tuningConfig, + ioConfig, + context, + groupId + ); + } + + @Override + protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner() + { + return taskRunner; + } + + @Override + protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier(final TaskToolbox toolbox) + { + return recordSupplier; + } + + @Override + public String getType() + { + return "index_test"; + } + } + + private static DataSchema getDataSchema() + { + List<DimensionSchema> dimensions = new ArrayList<>(); + dimensions.add(StringDimensionSchema.create("id")); + + return new DataSchema( + DATASOURCE, + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(dimensions), + new AggregatorFactory[]{ + // new CountAggregatorFactory("count") + }, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + false, + ImmutableList.of() + ), + null + ); + } + + public static class TestSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner<String, String, ByteEntity> + { + public TestSeekableStreamIndexTaskRunner(SeekableStreamIndexTask<String, String, ByteEntity> task, InputRowParser<ByteBuffer> parser, AuthorizerMapper authorizerMapper, LockGranularity lockGranularityToUse) + { + super(task, parser, authorizerMapper, lockGranularityToUse); + } + + @Override + protected boolean isEndOfShard(String seqNum) + { + return false; + } + + @Override + protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString) + { + return null; + } + + @Override + protected String getNextStartOffset(String sequenceNumber) + { + return sequenceNumber; + } + + @Override + protected SeekableStreamEndSequenceNumbers<String, String> deserializePartitionsFromMetadata(ObjectMapper mapper, Object object) + { + return mapper.convertValue(object, mapper.getTypeFactory().constructParametrizedType( + SeekableStreamEndSequenceNumbers.class, + SeekableStreamEndSequenceNumbers.class, + String.class, + String.class + )); Review Comment: ## Deprecated method or constructor invocation Invoking [TypeFactory.constructParametrizedType](1) should be avoided because it has been deprecated. [Show more details](https://github.com/apache/druid/security/code-scanning/5873) ########## indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java: ########## @@ -0,0 +1,855 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.indexing.seekablestream; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.Module; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.jsontype.NamedType; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.fasterxml.jackson.datatype.joda.JodaModule; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.inject.Binder; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.impl.ByteEntity; +import org.apache.druid.data.input.impl.DimensionSchema; +import org.apache.druid.data.input.impl.DimensionsSpec; +import org.apache.druid.data.input.impl.InputRowParser; +import org.apache.druid.data.input.impl.JsonInputFormat; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; +import org.apache.druid.indexer.TaskState; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.LockGranularity; +import org.apache.druid.indexing.common.TaskToolbox; +import org.apache.druid.indexing.common.TestUtils; +import org.apache.druid.indexing.common.task.Task; +import org.apache.druid.indexing.common.task.TaskResource; +import org.apache.druid.indexing.overlord.DataSourceMetadata; +import org.apache.druid.indexing.seekablestream.common.OrderedPartitionableRecord; +import org.apache.druid.indexing.seekablestream.common.OrderedSequenceNumber; +import org.apache.druid.indexing.seekablestream.common.RecordSupplier; +import org.apache.druid.indexing.seekablestream.common.StreamPartition; +import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig; +import org.apache.druid.initialization.DruidModule; +import org.apache.druid.java.util.common.FileUtils; +import org.apache.druid.java.util.common.concurrent.Execs; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.parsers.JSONPathSpec; +import org.apache.druid.java.util.emitter.EmittingLogger; +import org.apache.druid.java.util.emitter.core.NoopEmitter; +import org.apache.druid.java.util.emitter.service.ServiceEmitter; +import org.apache.druid.query.DefaultGenericQueryMetricsFactory; +import org.apache.druid.query.DefaultQueryRunnerFactoryConglomerate; +import org.apache.druid.query.Query; +import org.apache.druid.query.QueryRunnerFactory; +import org.apache.druid.query.QueryRunnerFactoryConglomerate; +import org.apache.druid.query.aggregation.AggregatorFactory; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.query.scan.ScanQueryConfig; +import org.apache.druid.query.scan.ScanQueryEngine; +import org.apache.druid.query.scan.ScanQueryQueryToolChest; +import org.apache.druid.query.scan.ScanQueryRunnerFactory; +import org.apache.druid.query.timeseries.TimeseriesQuery; +import org.apache.druid.query.timeseries.TimeseriesQueryEngine; +import org.apache.druid.query.timeseries.TimeseriesQueryQueryToolChest; +import org.apache.druid.query.timeseries.TimeseriesQueryRunnerFactory; +import org.apache.druid.segment.IndexSpec; +import org.apache.druid.segment.incremental.AppendableIndexSpec; +import org.apache.druid.segment.incremental.RowMeters; +import org.apache.druid.segment.indexing.DataSchema; +import org.apache.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory; +import org.apache.druid.server.security.AuthorizerMapper; +import org.joda.time.DateTime; +import org.joda.time.Duration; +import org.joda.time.Period; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import javax.annotation.Nullable; +import javax.validation.constraints.NotNull; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; + +@SuppressWarnings("unchecked") +@RunWith(Parameterized.class) +public class SeekableStreamIndexTaskRunnerTest extends SeekableStreamIndexTaskTestBase +{ + + private static final String STREAM = "stream"; + + private static final String DATASOURCE = "test_ds"; + + private static final String MESSAGE = "{\"id\": 1, \"age\": 10, \"timestamp\":\"2023-09-01T00:00:00.000\"}"; + + private static final String BASE_PERSIST_DIR = "./tmp"; + + private static RecordSupplier<String, String, ByteEntity> recordSupplier; + + private static ServiceEmitter emitter; + + private static SeekableStreamIndexTaskRunner taskRunner; + + public SeekableStreamIndexTaskRunnerTest(LockGranularity lockGranularity) + { + super(lockGranularity); + } + + @Parameterized.Parameters(name = "{0}") + public static Iterable<Object[]> constructorFeeder() + { + return ImmutableList.of( + new Object[]{LockGranularity.TIME_CHUNK}, + new Object[]{LockGranularity.SEGMENT} + ); + } + + @BeforeClass + public static void setupClass() + { + emitter = new ServiceEmitter( + "service", + "host", + new NoopEmitter() + ); + emitter.start(); + EmittingLogger.registerEmitter(emitter); + + taskExec = MoreExecutors.listeningDecorator( + Executors.newCachedThreadPool( + Execs.makeThreadFactory("runner-task-test-%d") + ) + ); + } + + @Before + public void setup() throws IOException + { + reportsFile = File.createTempFile("IndexTaskTestReports-" + System.currentTimeMillis(), "json"); + recordSupplier = new TestRecordSupplier(); + + TestUtils testUtils = new TestUtils(); + final ObjectMapper objectMapper = testUtils.getTestObjectMapper(); + + for (Module module : new TestIndexTaskModule().getJacksonModules()) { + objectMapper.registerModule(module); + } + + makeToolboxFactory(testUtils, emitter, false); + } + + @After + public void tearDownTest() throws IOException + { + synchronized (runningTasks) { + for (Task task : runningTasks) { + task.stopGracefully(toolboxFactory.build(task).getConfig()); + } + + runningTasks.clear(); + } + + reportsFile.delete(); + FileUtils.deleteDirectory(new File(BASE_PERSIST_DIR)); + destroyToolboxFactory(); + } + + @Test + public void testRunTaskWithoutIntermediateHandOff() throws ExecutionException, InterruptedException + { + TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig( + 0, + STREAM, + new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")), + null, + null, + null, + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false) + ); + + TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File(BASE_PERSIST_DIR), + null, + null, + null, + null, + null, + null, + null, + null, + null, + null, + null + ); + + SeekableStreamIndexTask task = new TestSeekableStreamIndexTask( + "id1", + null, + getDataSchema(), + taskTuningConfig, + taskIoConfig, + null, + "0" + ); + + taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK); + + final ListenableFuture<TaskStatus> future = runTask(task); + Thread.sleep(5 * 1000L); + + Assert.assertEquals(0, countEvents(task)); + Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.READING, task.getRunner().getStatus()); + + taskRunner.pause(); + taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true); + + Thread.sleep(5 * 1000L); // wait for publishing segment + taskRunner.stopGracefully(); + + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1)); + + publishedDescriptors(); + publishedSegments(); + } + + @Test + public void testRunTaskWithIntermediateHandOff() throws ExecutionException, InterruptedException + { + TestSeekableStreamIndexTaskIOConfig taskIoConfig = new TestSeekableStreamIndexTaskIOConfig( + 0, + STREAM, + new SeekableStreamStartSequenceNumbers<>(STREAM, Collections.singletonMap("0", "10"), Collections.emptySet()), + new SeekableStreamEndSequenceNumbers<>(STREAM, Collections.singletonMap("0", "20")), + null, + null, + null, + new JsonInputFormat(new JSONPathSpec(true, ImmutableList.of()), ImmutableMap.of(), false, false, false) + ); + + TestSeekableStreamIndexTaskTuningConfig taskTuningConfig = new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File(BASE_PERSIST_DIR), + null, + null, + null, + null, + null, + null, + null, + Period.seconds(5), + null, + null, + null + ); + + + SeekableStreamIndexTask task = new TestSeekableStreamIndexTask( + "id1", + null, + getDataSchema(), + taskTuningConfig, + taskIoConfig, + null, + "0" + ); + + taskRunner = new TestSeekableStreamIndexTaskRunner(task, getDataSchema().getParser(), task.authorizerMapper, LockGranularity.TIME_CHUNK); + + final ListenableFuture<TaskStatus> future = runTask(task); + Thread.sleep(10 * 1000L); // > intermediateHandoffPeriod + + Assert.assertEquals(0, countEvents(task)); + Assert.assertEquals(SeekableStreamIndexTaskRunner.Status.PAUSED, task.getRunner().getStatus()); + + // taskRunner.pause(); + taskRunner.setEndOffsets(Collections.singletonMap("0", "11"), true); + // taskRunner.possiblyResetDataSourceMetadata(this.toolbox, null, Collections.emptySet()); + + Thread.sleep(5 * 1000L); // wait for publishing segment + taskRunner.stopGracefully(); + + // Wait for task to exit + Assert.assertEquals(TaskState.SUCCESS, future.get().getStatusCode()); + verifyTaskMetrics(task, RowMeters.with().bytes(MESSAGE.length()).unparseable(0).totalProcessed(1)); + + publishedDescriptors(); + publishedSegments(); + } + + @Override + protected QueryRunnerFactoryConglomerate makeQueryRunnerConglomerate() + { + return new DefaultQueryRunnerFactoryConglomerate( + ImmutableMap.<Class<? extends Query>, QueryRunnerFactory>builder() + .put( + TimeseriesQuery.class, + new TimeseriesQueryRunnerFactory( + new TimeseriesQueryQueryToolChest(), + new TimeseriesQueryEngine(), + (query, future) -> { + // do nothing + } + ) + ).put( + ScanQuery.class, + new ScanQueryRunnerFactory( + new ScanQueryQueryToolChest( + new ScanQueryConfig(), + new DefaultGenericQueryMetricsFactory() + ), + new ScanQueryEngine(), + new ScanQueryConfig() + ) + ) + .build() + ); + } + + public static class TestRecordSupplier implements RecordSupplier<String, String, ByteEntity> + { + @Override + public void assign(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public void seek(StreamPartition<String> partition, String sequenceNumber) + { + } + + @Override + public void seekToEarliest(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public void seekToLatest(Set<StreamPartition<String>> streamPartitions) + { + } + + @Override + public Collection<StreamPartition<String>> getAssignment() + { + return Collections.singletonList(new StreamPartition<>(STREAM, "0")); + } + + @Override + public @NotNull List<OrderedPartitionableRecord<String, String, ByteEntity>> poll(long timeout) + { + return Collections.emptyList(); + } + + @Override + public String getLatestSequenceNumber(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public String getEarliestSequenceNumber(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public boolean isOffsetAvailable(StreamPartition<String> partition, OrderedSequenceNumber<String> offset) + { + return false; + } + + @Override + public String getPosition(StreamPartition<String> partition) + { + return "10"; + } + + @Override + public Set<String> getPartitionIds(String stream) + { + return Sets.newHashSet("0"); + } + + @Override + public void close() + { + } + } + + public static class TestSeekableStreamSupervisorTuningConfig implements SeekableStreamSupervisorTuningConfig + { + @Override + public Integer getWorkerThreads() + { + return 1; + } + + @Override + public Long getChatRetries() + { + return 1L; + } + + @Override + public Duration getHttpTimeout() + { + return new Period("PT1M").toStandardDuration(); + } + + @Override + public Duration getShutdownTimeout() + { + return new Period("PT1S").toStandardDuration(); + } + + @Override + public Duration getRepartitionTransitionDuration() + { + return new Period("PT2M").toStandardDuration(); + } + + @Override + public Duration getOffsetFetchPeriod() + { + return new Period("PT5M").toStandardDuration(); + } + + @Override + public TestSeekableStreamIndexTaskTuningConfig convertToTaskTuningConfig() + { + return new TestSeekableStreamIndexTaskTuningConfig( + null, + null, + 10L, + false, + null, + null, + Period.seconds(1), + new File("./tmp"), + null, + null, + null, + null, + null, + null, + null, + Period.seconds(5), + null, + null, + null + ); + } + } + + public static class TestSeekableStreamIndexTaskTuningConfig extends SeekableStreamIndexTaskTuningConfig + { + @JsonCreator + public TestSeekableStreamIndexTaskTuningConfig( + @JsonProperty("appendableIndexSpec") @Nullable AppendableIndexSpec appendableIndexSpec, + @JsonProperty("maxRowsInMemory") @Nullable Integer maxRowsInMemory, + @JsonProperty("maxBytesInMemory") @Nullable Long maxBytesInMemory, + @JsonProperty("skipBytesInMemoryOverheadCheck") @Nullable Boolean skipBytesInMemoryOverheadCheck, + @JsonProperty("maxRowsPerSegment") @Nullable Integer maxRowsPerSegment, + @JsonProperty("maxTotalRows") @Nullable Long maxTotalRows, + @JsonProperty("intermediatePersistPeriod") @Nullable Period intermediatePersistPeriod, + @JsonProperty("basePersistDirectory") @Nullable File basePersistDirectory, + @JsonProperty("maxPendingPersists") @Nullable Integer maxPendingPersists, + @JsonProperty("indexSpec") @Nullable IndexSpec indexSpec, + @JsonProperty("indexSpecForIntermediatePersists") @Nullable IndexSpec indexSpecForIntermediatePersists, + @Deprecated @JsonProperty("reportParseExceptions") @Nullable Boolean reportParseExceptions, + @JsonProperty("handoffConditionTimeout") @Nullable Long handoffConditionTimeout, + @JsonProperty("resetOffsetAutomatically") @Nullable Boolean resetOffsetAutomatically, + @JsonProperty("segmentWriteOutMediumFactory") @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory, + @JsonProperty("intermediateHandoffPeriod") @Nullable Period intermediateHandoffPeriod, + @JsonProperty("logParseExceptions") @Nullable Boolean logParseExceptions, + @JsonProperty("maxParseExceptions") @Nullable Integer maxParseExceptions, + @JsonProperty("maxSavedParseExceptions") @Nullable Integer maxSavedParseExceptions + ) + { + super( + appendableIndexSpec, + maxRowsInMemory, + maxBytesInMemory, + skipBytesInMemoryOverheadCheck, + maxRowsPerSegment, + maxTotalRows, + intermediatePersistPeriod, + basePersistDirectory, + maxPendingPersists, + indexSpec, + indexSpecForIntermediatePersists, + reportParseExceptions, + handoffConditionTimeout, + resetOffsetAutomatically, + false, + segmentWriteOutMediumFactory, + intermediateHandoffPeriod, + logParseExceptions, + maxParseExceptions, + maxSavedParseExceptions + ); + } + + @Override + public SeekableStreamIndexTaskTuningConfig withBasePersistDirectory(File dir) + { + return new TestSeekableStreamIndexTaskTuningConfig( + getAppendableIndexSpec(), + getMaxRowsInMemory(), + getMaxBytesInMemory(), + isSkipBytesInMemoryOverheadCheck(), + getMaxRowsPerSegment(), + getMaxTotalRows(), + getIntermediatePersistPeriod(), + dir, + getMaxPendingPersists(), + getIndexSpec(), + getIndexSpecForIntermediatePersists(), + isReportParseExceptions(), + getHandoffConditionTimeout(), + isResetOffsetAutomatically(), + getSegmentWriteOutMediumFactory(), + getIntermediateHandoffPeriod(), + isLogParseExceptions(), + getMaxParseExceptions(), + getMaxSavedParseExceptions() + ); + } + + @Override + public String toString() + { + return "TestSeekableStreamIndexTaskTuningConfig{" + + "maxRowsInMemory=" + getMaxRowsInMemory() + + ", maxRowsPerSegment=" + getMaxRowsPerSegment() + + ", maxTotalRows=" + getMaxTotalRows() + + ", maxBytesInMemory=" + getMaxBytesInMemory() + + ", skipBytesInMemoryOverheadCheck=" + isSkipBytesInMemoryOverheadCheck() + + ", intermediatePersistPeriod=" + getIntermediatePersistPeriod() + + ", maxPendingPersists=" + getMaxPendingPersists() + + ", indexSpec=" + getIndexSpec() + + ", indexSpecForIntermediatePersists=" + getIndexSpecForIntermediatePersists() + + ", reportParseExceptions=" + isReportParseExceptions() + + ", handoffConditionTimeout=" + getHandoffConditionTimeout() + + ", resetOffsetAutomatically=" + isResetOffsetAutomatically() + + ", segmentWriteOutMediumFactory=" + getSegmentWriteOutMediumFactory() + + ", intermediateHandoffPeriod=" + getIntermediateHandoffPeriod() + + ", logParseExceptions=" + isLogParseExceptions() + + ", maxParseExceptions=" + getMaxParseExceptions() + + ", maxSavedParseExceptions=" + getMaxSavedParseExceptions() + + '}'; + } + } + + public static class TestSeekableStreamIndexTask extends SeekableStreamIndexTask<String, String, ByteEntity> + { + @JsonCreator + public TestSeekableStreamIndexTask( + @JsonProperty("id") String id, + @JsonProperty("resource") TaskResource taskResource, + @JsonProperty("dataSchema") DataSchema dataSchema, + @JsonProperty("tuningConfig") TestSeekableStreamIndexTaskTuningConfig tuningConfig, + @JsonProperty("ioConfig") TestSeekableStreamIndexTaskIOConfig ioConfig, + @JsonProperty("context") Map<String, Object> context, + @JsonProperty("groupId") String groupId + ) + { + super(id, + taskResource, + dataSchema, + tuningConfig, + ioConfig, + context, + groupId + ); + } + + @Override + protected SeekableStreamIndexTaskRunner<String, String, ByteEntity> createTaskRunner() + { + return taskRunner; + } + + @Override + protected RecordSupplier<String, String, ByteEntity> newTaskRecordSupplier(final TaskToolbox toolbox) + { + return recordSupplier; + } + + @Override + public String getType() + { + return "index_test"; + } + } + + private static DataSchema getDataSchema() + { + List<DimensionSchema> dimensions = new ArrayList<>(); + dimensions.add(StringDimensionSchema.create("id")); + + return new DataSchema( + DATASOURCE, + new TimestampSpec("timestamp", "iso", null), + new DimensionsSpec(dimensions), + new AggregatorFactory[]{ + // new CountAggregatorFactory("count") + }, + new UniformGranularitySpec( + Granularities.HOUR, + Granularities.MINUTE, + false, + ImmutableList.of() + ), + null + ); + } + + public static class TestSeekableStreamIndexTaskRunner extends SeekableStreamIndexTaskRunner<String, String, ByteEntity> + { + public TestSeekableStreamIndexTaskRunner(SeekableStreamIndexTask<String, String, ByteEntity> task, InputRowParser<ByteBuffer> parser, AuthorizerMapper authorizerMapper, LockGranularity lockGranularityToUse) + { + super(task, parser, authorizerMapper, lockGranularityToUse); + } + + @Override + protected boolean isEndOfShard(String seqNum) + { + return false; + } + + @Override + protected TreeMap<Integer, Map<String, String>> getCheckPointsFromContext(TaskToolbox toolbox, String checkpointsString) + { + return null; + } + + @Override + protected String getNextStartOffset(String sequenceNumber) + { + return sequenceNumber; + } + + @Override + protected SeekableStreamEndSequenceNumbers<String, String> deserializePartitionsFromMetadata(ObjectMapper mapper, Object object) + { + return mapper.convertValue(object, mapper.getTypeFactory().constructParametrizedType( + SeekableStreamEndSequenceNumbers.class, + SeekableStreamEndSequenceNumbers.class, + String.class, + String.class + )); + } + + @Override + protected @NotNull List<OrderedPartitionableRecord<String, String, ByteEntity>> getRecords(RecordSupplier<String, String, ByteEntity> recordSupplier, TaskToolbox toolbox) + { + return Collections.singletonList(new OrderedPartitionableRecord(STREAM, "0", "11", Collections.singletonList(new ByteEntity(MESSAGE.getBytes(StandardCharsets.UTF_8))))); + } + + @Override + protected SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadata(SeekableStreamSequenceNumbers<String, String> partitions) + { + return new TestSeekableStreamDataSourceMetadata(partitions); + } + + @Override + protected OrderedSequenceNumber<String> createSequenceNumber(String sequenceNumber) + { + return new TestSequenceNumber(sequenceNumber); + } + + @Override + protected void possiblyResetDataSourceMetadata(TaskToolbox toolbox, RecordSupplier<String, String, ByteEntity> recordSupplier, Set<StreamPartition<String>> assignment) + { + } + + @Override + protected boolean isEndOffsetExclusive() + { + return false; + } + + @Override + protected TypeReference<List<SequenceMetadata<String, String>>> getSequenceMetadataTypeReference() + { + return new TypeReference<List<SequenceMetadata<String, String>>>() + { + }; + } + } + + private static class TestSequenceNumber extends OrderedSequenceNumber<String> Review Comment: ## Inconsistent compareTo This class declares [compareTo](1) but inherits equals; the two could be inconsistent. [Show more details](https://github.com/apache/druid/security/code-scanning/5875) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
