This is an automated email from the ASF dual-hosted git repository.
jonwei pushed a commit to branch 0.16.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.16.1-incubating by this push:
new 7d24e38 [Backport] Retrying with a backward compatible task type on
unknown task type error in parallel indexing (#8905) (#8949)
7d24e38 is described below
commit 7d24e38b3051e2819955b8a1635110dfd8abb8f5
Author: Jonathan Wei <[email protected]>
AuthorDate: Tue Nov 26 23:38:44 2019 -0800
[Backport] Retrying with a backward compatible task type on unknown task
type error in parallel indexing (#8905) (#8949)
* Retrying with a backward compatible task type on unknown task type error
in parallel indexing (#8905)
* Retrying with a backward compatible task type on unknown task type error
in parallel indexing
* Register legacy class; add a serde test
* Backport fix, use firehoses
---
.../apache/druid/indexing/common/task/Task.java | 32 +++---
.../batch/parallel/LegacySinglePhaseSubTask.java | 68 +++++++++++++
.../task/batch/parallel/ParallelIndexIOConfig.java | 2 +-
.../task/batch/parallel/SinglePhaseSubTask.java | 1 +
.../batch/parallel/SinglePhaseSubTaskSpec.java | 17 ++++
.../common/task/batch/parallel/SubTaskSpec.java | 17 ++++
.../common/task/batch/parallel/TaskMonitor.java | 44 +++++++--
.../apache/druid/indexing/common/TestUtils.java | 3 +
.../common/task/NoopIndexTaskClientFactory.java} | 31 +++---
.../batch/parallel/SinglePhaseSubTaskSpecTest.java | 107 +++++++++++++++++++++
.../task/batch/parallel/TaskMonitorTest.java | 81 ++++++++++++++--
11 files changed, 355 insertions(+), 48 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 58612ce..4a51121 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -20,11 +20,13 @@
package org.apache.druid.indexing.common.task;
import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.actions.TaskActionClient;
import org.apache.druid.indexing.common.config.TaskConfig;
+import
org.apache.druid.indexing.common.task.batch.parallel.LegacySinglePhaseSubTask;
import
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
import
org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentGenerateTask;
import
org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeTask;
@@ -48,21 +50,21 @@ import java.util.Map;
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
@JsonSubTypes(value = {
- @JsonSubTypes.Type(name = "kill", value = KillTask.class),
- @JsonSubTypes.Type(name = "move", value = MoveTask.class),
- @JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
- @JsonSubTypes.Type(name = "restore", value = RestoreTask.class),
- @JsonSubTypes.Type(name = "index", value = IndexTask.class),
- @JsonSubTypes.Type(name = ParallelIndexSupervisorTask.TYPE, value =
ParallelIndexSupervisorTask.class),
- @JsonSubTypes.Type(name = SinglePhaseSubTask.TYPE, value =
SinglePhaseSubTask.class),
- @JsonSubTypes.Type(name = "index_sub", value = SinglePhaseSubTask.class),
// for backward compatibility
- @JsonSubTypes.Type(name = PartialSegmentGenerateTask.TYPE, value =
PartialSegmentGenerateTask.class),
- @JsonSubTypes.Type(name = PartialSegmentMergeTask.TYPE, value =
PartialSegmentMergeTask.class),
- @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
- @JsonSubTypes.Type(name = "index_realtime", value =
RealtimeIndexTask.class),
- @JsonSubTypes.Type(name = "index_realtime_appenderator", value =
AppenderatorDriverRealtimeIndexTask.class),
- @JsonSubTypes.Type(name = "noop", value = NoopTask.class),
- @JsonSubTypes.Type(name = "compact", value = CompactionTask.class)
+ @Type(name = "kill", value = KillTask.class),
+ @Type(name = "move", value = MoveTask.class),
+ @Type(name = "archive", value = ArchiveTask.class),
+ @Type(name = "restore", value = RestoreTask.class),
+ @Type(name = "index", value = IndexTask.class),
+ @Type(name = ParallelIndexSupervisorTask.TYPE, value =
ParallelIndexSupervisorTask.class),
+ @Type(name = SinglePhaseSubTask.TYPE, value = SinglePhaseSubTask.class),
+ @Type(name = SinglePhaseSubTask.OLD_TYPE_NAME, value =
LegacySinglePhaseSubTask.class), // for backward compatibility
+ @Type(name = PartialSegmentGenerateTask.TYPE, value =
PartialSegmentGenerateTask.class),
+ @Type(name = PartialSegmentMergeTask.TYPE, value =
PartialSegmentMergeTask.class),
+ @Type(name = "index_hadoop", value = HadoopIndexTask.class),
+ @Type(name = "index_realtime", value = RealtimeIndexTask.class),
+ @Type(name = "index_realtime_appenderator", value =
AppenderatorDriverRealtimeIndexTask.class),
+ @Type(name = "noop", value = NoopTask.class),
+ @Type(name = "compact", value = CompactionTask.class)
})
public interface Task
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
new file mode 100644
index 0000000..4e842f2
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class LegacySinglePhaseSubTask extends SinglePhaseSubTask
+{
+ @JsonCreator
+ public LegacySinglePhaseSubTask(
+ @JsonProperty("id") @Nullable final String id,
+ @JsonProperty("groupId") final String groupId,
+ @JsonProperty("resource") final TaskResource taskResource,
+ @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+ @JsonProperty("numAttempts") final int numAttempts, // zero-based
counting
+ @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+ @JsonProperty("context") final Map<String, Object> context,
+ @JacksonInject IndexingServiceClient indexingServiceClient,
+ @JacksonInject IndexTaskClientFactory<ParallelIndexSupervisorTaskClient>
taskClientFactory,
+ @JacksonInject AppenderatorsManager appenderatorsManager
+ )
+ {
+ super(
+ id,
+ groupId,
+ taskResource,
+ supervisorTaskId,
+ numAttempts,
+ ingestionSchema,
+ context,
+ indexingServiceClient,
+ taskClientFactory,
+ appenderatorsManager
+ );
+ }
+
+ @Override
+ public String getType()
+ {
+ return SinglePhaseSubTask.OLD_TYPE_NAME;
+ }
+}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
index 2e4ea8d..a87da9a 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
@@ -35,7 +35,7 @@ public class ParallelIndexIOConfig extends IndexIOConfig
{
@JsonCreator
public ParallelIndexIOConfig(
- @JsonProperty("firehose") FirehoseFactory firehoseFactory,
+ @JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory,
@JsonProperty("appendToExisting") @Nullable Boolean appendToExisting
)
{
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 9b2d43d..eaa9e2e 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -96,6 +96,7 @@ import java.util.stream.Collectors;
public class SinglePhaseSubTask extends AbstractBatchIndexTask
{
public static final String TYPE = "single_phase_sub_task";
+ public static final String OLD_TYPE_NAME = "index_sub";
private static final Logger LOG = new Logger(SinglePhaseSubTask.class);
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java
index 8df941c..2afa7ea 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java
@@ -66,4 +66,21 @@ class SinglePhaseSubTaskSpec extends
SubTaskSpec<SinglePhaseSubTask>
new DummyForInjectionAppenderatorsManager()
);
}
+
+ @Override
+ public SinglePhaseSubTask newSubTaskWithBackwardCompatibleType(int
numAttempts)
+ {
+ return new LegacySinglePhaseSubTask(
+ null,
+ getGroupId(),
+ null,
+ getSupervisorTaskId(),
+ numAttempts,
+ getIngestionSpec(),
+ getContext(),
+ null,
+ null,
+ new DummyForInjectionAppenderatorsManager()
+ );
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java
index c11552e..f23f260 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java
@@ -80,5 +80,22 @@ public abstract class SubTaskSpec<T extends Task>
return inputSplit;
}
+ /**
+ * Creates a new task for this SubTaskSpec.
+ */
public abstract T newSubTask(int numAttempts);
+
+ /**
+ * Creates a new task but with a backward compatible type for this
SubTaskSpec. This is to support to rolling update
+ * for parallel indexing task and subclasses override this method properly
if its type name has changed between
+ * releases. See https://github.com/apache/incubator-druid/issues/8836 for
more details.
+ *
+ * This method will be called if {@link #newSubTask} fails with an {@link
IllegalStateException} with an error
+ * message starting with "Could not resolve type id". The failure of {@link
#newSubTask} with this error is NOT
+ * recorded as a failed attempt in {@link TaskHistory}.
+ */
+ public T newSubTaskWithBackwardCompatibleType(int numAttempts)
+ {
+ return newSubTask(numAttempts);
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
index 33cc804..bb5ef75 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
@@ -220,11 +220,10 @@ public class TaskMonitor<T extends Task>
{
synchronized (startStopLock) {
if (!running) {
- return Futures.immediateFailedFuture(new ISE("TaskMonitore is not
running"));
+ return Futures.immediateFailedFuture(new ISE("TaskMonitor is not
running"));
}
- final T task = spec.newSubTask(0);
- log.info("Submitting a new task[%s] for spec[%s]", task.getId(),
spec.getId());
- indexingServiceClient.runTask(task);
+ final T task = submitTask(spec, 0);
+ log.info("Submitted a new task[%s] for spec[%s]", task.getId(),
spec.getId());
incrementNumRunningTasks();
final SettableFuture<SubTaskCompleteEvent<T>> taskFuture =
SettableFuture.create();
@@ -246,9 +245,8 @@ public class TaskMonitor<T extends Task>
synchronized (startStopLock) {
if (running) {
final SubTaskSpec<T> spec = monitorEntry.spec;
- final T task = spec.newSubTask(monitorEntry.taskHistory.size() + 1);
- log.info("Submitting a new task[%s] for retrying spec[%s]",
task.getId(), spec.getId());
- indexingServiceClient.runTask(task);
+ final T task = submitTask(spec, monitorEntry.taskHistory.size() + 1);
+ log.info("Submitted a new task[%s] for retrying spec[%s]",
task.getId(), spec.getId());
incrementNumRunningTasks();
runningTasks.put(
@@ -263,6 +261,38 @@ public class TaskMonitor<T extends Task>
}
}
+ private T submitTask(SubTaskSpec<T> spec, int numAttempts)
+ {
+ T task = spec.newSubTask(numAttempts);
+ try {
+ indexingServiceClient.runTask(task);
+ }
+ catch (Exception e) {
+ if (isUnknownTypeIdException(e)) {
+ log.warn(e, "Got an unknown type id error. Retrying with a backward
compatible type.");
+ task = spec.newSubTaskWithBackwardCompatibleType(numAttempts);
+ indexingServiceClient.runTask(task);
+ } else {
+ throw e;
+ }
+ }
+ return task;
+ }
+
+ private boolean isUnknownTypeIdException(Throwable e)
+ {
+ if (e instanceof IllegalStateException) {
+ if (e.getMessage() != null && e.getMessage().contains("Could not resolve
type id")) {
+ return true;
+ }
+ }
+ if (e.getCause() != null) {
+ return isUnknownTypeIdException(e.getCause());
+ } else {
+ return false;
+ }
+ }
+
private void incrementNumRunningTasks()
{
synchronized (taskCountLock) {
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
index 33011f9..175821d 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
@@ -28,6 +28,8 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
import org.apache.druid.client.indexing.NoopIndexingServiceClient;
import
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
+import org.apache.druid.indexing.common.task.NoopIndexTaskClientFactory;
import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.ISE;
@@ -92,6 +94,7 @@ public class TestUtils
.addValue(AuthorizerMapper.class, new
AuthorizerMapper(ImmutableMap.of()))
.addValue(AppenderatorsManager.class, new
TestAppenderatorsManager())
.addValue(LocalDataSegmentPuller.class, new
LocalDataSegmentPuller())
+ .addValue(IndexTaskClientFactory.class, new
NoopIndexTaskClientFactory())
);
jsonMapper.registerModule(
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopIndexTaskClientFactory.java
similarity index 51%
copy from
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
copy to
indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopIndexTaskClientFactory.java
index 2e4ea8d..8138f7c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopIndexTaskClientFactory.java
@@ -17,28 +17,23 @@
* under the License.
*/
-package org.apache.druid.indexing.common.task.batch.parallel;
+package org.apache.druid.indexing.common.task;
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.druid.data.input.FirehoseFactory;
-import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import org.apache.druid.indexing.common.IndexTaskClient;
+import org.apache.druid.indexing.common.TaskInfoProvider;
+import org.joda.time.Duration;
-import javax.annotation.Nullable;
-
-/**
- * Same with {@link IndexIOConfig} except its JSON type name.
- */
-@JsonTypeName("index_parallel")
-public class ParallelIndexIOConfig extends IndexIOConfig
+public class NoopIndexTaskClientFactory implements IndexTaskClientFactory
{
- @JsonCreator
- public ParallelIndexIOConfig(
- @JsonProperty("firehose") FirehoseFactory firehoseFactory,
- @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting
+ @Override
+ public IndexTaskClient build(
+ TaskInfoProvider taskInfoProvider,
+ String callerId,
+ int numThreads,
+ Duration httpTimeout,
+ long numRetries
)
{
- super(firehoseFactory, appendToExisting);
+ throw new UnsupportedOperationException();
}
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java
new file mode 100644
index 0000000..e38dcb3
--- /dev/null
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class SinglePhaseSubTaskSpecTest
+{
+ private static final ObjectMapper MAPPER = new
TestUtils().getTestObjectMapper();
+
+ private static ParallelIndexIngestionSpec createParallelIndexIngestionSpec()
throws IOException
+ {
+ final InputRowParser parser = new StringInputRowParser(
+ new JSONParseSpec(
+ new TimestampSpec(null, null, null),
+ new DimensionsSpec(null),
+ null,
+ null
+ ),
+ StringUtils.UTF8_STRING
+ );
+ final Map<String, Object> parserMap =
MAPPER.readValue(MAPPER.writeValueAsBytes(parser), Map.class);
+ return new ParallelIndexIngestionSpec(
+ new DataSchema(
+ "dataSource",
+ parserMap,
+ new AggregatorFactory[0],
+ null,
+ null,
+ MAPPER
+ ),
+ new ParallelIndexIOConfig(
+ new LocalFirehoseFactory(new File("baseDir"), "filter", null),
+ null
+ ),
+ null
+ );
+ }
+
+ private SinglePhaseSubTaskSpec spec;
+
+ @Before
+ public void setup() throws IOException
+ {
+ spec = new SinglePhaseSubTaskSpec(
+ "id",
+ "groupId",
+ "supervisorTaskId",
+ createParallelIndexIngestionSpec(),
+ null,
+ new InputSplit<>("string split")
+ );
+ }
+
+ @Test
+ public void testNewSubTaskType() throws IOException
+ {
+ final SinglePhaseSubTask expected = spec.newSubTask(0);
+ final byte[] json = MAPPER.writeValueAsBytes(expected);
+ final Map<String, Object> actual = MAPPER.readValue(json, Map.class);
+ Assert.assertEquals(SinglePhaseSubTask.TYPE, actual.get("type"));
+ }
+
+ @Test
+ public void testNewSubTaskWithBackwardCompatibleType() throws IOException
+ {
+ final SinglePhaseSubTask expected =
spec.newSubTaskWithBackwardCompatibleType(0);
+ final byte[] json = MAPPER.writeValueAsBytes(expected);
+ final Map<String, Object> actual = MAPPER.readValue(json, Map.class);
+ Assert.assertEquals(SinglePhaseSubTask.OLD_TYPE_NAME, actual.get("type"));
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
index 6043768..07a231e 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
import org.apache.druid.indexing.common.task.NoopTask;
import
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.junit.After;
import org.junit.Assert;
@@ -77,7 +78,7 @@ public class TaskMonitorTest
final List<ListenableFuture<SubTaskCompleteEvent<TestTask>>> futures =
IntStream
.range(0, 10)
.mapToObj(i -> monitor.submit(
- new TestTaskSpec("specId" + i, "groupId", "supervisorId", null,
new IntegerInputSplit(i), 100L, 0)
+ new TestTaskSpec("specId" + i, "groupId", "supervisorId", null,
new IntegerInputSplit(i), 100L, 0, false)
))
.collect(Collectors.toList());
for (int i = 0; i < futures.size(); i++) {
@@ -98,7 +99,16 @@ public class TaskMonitorTest
final List<TestTaskSpec> specs = IntStream
.range(0, 10)
.mapToObj(
- i -> new TestTaskSpec("specId" + i, "groupId", "supervisorId",
null, new IntegerInputSplit(i), 100L, 2)
+ i -> new TestTaskSpec(
+ "specId" + i,
+ "groupId",
+ "supervisorId",
+ null,
+ new IntegerInputSplit(i),
+ 100L,
+ 2,
+ false
+ )
)
.collect(Collectors.toList());
final List<ListenableFuture<SubTaskCompleteEvent<TestTask>>> futures =
specs
@@ -127,43 +137,97 @@ public class TaskMonitorTest
}
}
+ @Test
+ public void testResubmitWithOldType() throws InterruptedException,
ExecutionException, TimeoutException
+ {
+ final List<TestTaskSpec> specs = IntStream
+ .range(0, 10)
+ .mapToObj(
+ i -> new TestTaskSpec(
+ "specId" + i,
+ "groupId",
+ "supervisorId",
+ null,
+ new IntegerInputSplit(i),
+ 100L,
+ 0,
+ true
+ )
+ )
+ .collect(Collectors.toList());
+ final List<ListenableFuture<SubTaskCompleteEvent<TestTask>>> futures =
specs
+ .stream()
+ .map(monitor::submit)
+ .collect(Collectors.toList());
+ for (int i = 0; i < futures.size(); i++) {
+ // # of threads of taskRunner is 5, and each task is expected to be run
3 times (with 2 retries), so the expected
+ // max timeout is 6 sec. We additionally wait 4 more seconds here to
make sure the test passes.
+ final SubTaskCompleteEvent<TestTask> result = futures.get(i).get(2,
TimeUnit.SECONDS);
+ Assert.assertEquals("supervisorId",
result.getSpec().getSupervisorTaskId());
+ Assert.assertEquals("specId" + i, result.getSpec().getId());
+
+ Assert.assertNotNull(result.getLastStatus());
+ Assert.assertEquals(TaskState.SUCCESS,
result.getLastStatus().getStatusCode());
+ Assert.assertEquals(TaskState.SUCCESS, result.getLastState());
+
+ final TaskHistory<TestTask> taskHistory =
monitor.getCompleteSubTaskSpecHistory(specs.get(i).getId());
+ Assert.assertNotNull(taskHistory);
+
+ final List<TaskStatusPlus> attemptHistory =
taskHistory.getAttemptHistory();
+ Assert.assertNotNull(attemptHistory);
+ Assert.assertEquals(1, attemptHistory.size());
+ Assert.assertEquals(TaskState.SUCCESS,
attemptHistory.get(0).getStatusCode());
+ }
+ }
+
private static class TestTaskSpec extends SubTaskSpec<TestTask>
{
private final long runTime;
private final int numMaxFails;
+ private final boolean throwUnknownTypeIdError;
private int numFails;
- public TestTaskSpec(
+ TestTaskSpec(
String id,
String groupId,
String supervisorTaskId,
Map<String, Object> context,
InputSplit inputSplit,
long runTime,
- int numMaxFails
+ int numMaxFails,
+ boolean throwUnknownTypeIdError
)
{
super(id, groupId, supervisorTaskId, context, inputSplit);
this.runTime = runTime;
this.numMaxFails = numMaxFails;
+ this.throwUnknownTypeIdError = throwUnknownTypeIdError;
}
@Override
public TestTask newSubTask(int numAttempts)
{
- return new TestTask(getId(), runTime, numFails++ < numMaxFails);
+ return new TestTask(getId(), runTime, numFails++ < numMaxFails,
throwUnknownTypeIdError);
+ }
+
+ @Override
+ public TestTask newSubTaskWithBackwardCompatibleType(int numAttempts)
+ {
+ return new TestTask(getId(), runTime, numFails++ < numMaxFails, false);
}
}
private static class TestTask extends NoopTask
{
private final boolean shouldFail;
+ private final boolean throwUnknownTypeIdError;
- TestTask(String id, long runTime, boolean shouldFail)
+ TestTask(String id, long runTime, boolean shouldFail, boolean
throwUnknownTypeIdError)
{
super(id, null, "testDataSource", runTime, 0, null, null, null);
this.shouldFail = shouldFail;
+ this.throwUnknownTypeIdError = throwUnknownTypeIdError;
}
@Override
@@ -185,6 +249,9 @@ public class TaskMonitorTest
{
final TestTask task = (TestTask) taskObject;
tasks.put(task.getId(), TaskState.RUNNING);
+ if (task.throwUnknownTypeIdError) {
+ throw new RuntimeException(new ISE("Could not resolve type id
'test_task_id'"));
+ }
taskRunner.submit(() -> tasks.put(task.getId(),
task.run(null).getStatusCode()));
return task.getId();
}
@@ -213,7 +280,7 @@ public class TaskMonitorTest
private static class IntegerInputSplit extends InputSplit<Integer>
{
- public IntegerInputSplit(int split)
+ IntegerInputSplit(int split)
{
super(split);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]