This is an automated email from the ASF dual-hosted git repository.

jonwei pushed a commit to branch 0.16.1-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.16.1-incubating by this push:
     new 7d24e38  [Backport] Retrying with a backward compatible task type on 
unknown task type error in parallel indexing (#8905) (#8949)
7d24e38 is described below

commit 7d24e38b3051e2819955b8a1635110dfd8abb8f5
Author: Jonathan Wei <[email protected]>
AuthorDate: Tue Nov 26 23:38:44 2019 -0800

    [Backport] Retrying with a backward compatible task type on unknown task 
type error in parallel indexing (#8905) (#8949)
    
    * Retrying with a backward compatible task type on unknown task type error 
in parallel indexing (#8905)
    
    * Retrying with a backward compatible task type on unknown task type error 
in parallel indexing
    
    * Register legacy class; add a serde test
    
    * Backport fix, use firehoses
---
 .../apache/druid/indexing/common/task/Task.java    |  32 +++---
 .../batch/parallel/LegacySinglePhaseSubTask.java   |  68 +++++++++++++
 .../task/batch/parallel/ParallelIndexIOConfig.java |   2 +-
 .../task/batch/parallel/SinglePhaseSubTask.java    |   1 +
 .../batch/parallel/SinglePhaseSubTaskSpec.java     |  17 ++++
 .../common/task/batch/parallel/SubTaskSpec.java    |  17 ++++
 .../common/task/batch/parallel/TaskMonitor.java    |  44 +++++++--
 .../apache/druid/indexing/common/TestUtils.java    |   3 +
 .../common/task/NoopIndexTaskClientFactory.java}   |  31 +++---
 .../batch/parallel/SinglePhaseSubTaskSpecTest.java | 107 +++++++++++++++++++++
 .../task/batch/parallel/TaskMonitorTest.java       |  81 ++++++++++++++--
 11 files changed, 355 insertions(+), 48 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
index 58612ce..4a51121 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/Task.java
@@ -20,11 +20,13 @@
 package org.apache.druid.indexing.common.task;
 
 import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonSubTypes.Type;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.actions.TaskActionClient;
 import org.apache.druid.indexing.common.config.TaskConfig;
+import 
org.apache.druid.indexing.common.task.batch.parallel.LegacySinglePhaseSubTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.ParallelIndexSupervisorTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentGenerateTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.PartialSegmentMergeTask;
@@ -48,21 +50,21 @@ import java.util.Map;
  */
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type")
 @JsonSubTypes(value = {
-    @JsonSubTypes.Type(name = "kill", value = KillTask.class),
-    @JsonSubTypes.Type(name = "move", value = MoveTask.class),
-    @JsonSubTypes.Type(name = "archive", value = ArchiveTask.class),
-    @JsonSubTypes.Type(name = "restore", value = RestoreTask.class),
-    @JsonSubTypes.Type(name = "index", value = IndexTask.class),
-    @JsonSubTypes.Type(name = ParallelIndexSupervisorTask.TYPE, value = 
ParallelIndexSupervisorTask.class),
-    @JsonSubTypes.Type(name = SinglePhaseSubTask.TYPE, value = 
SinglePhaseSubTask.class),
-    @JsonSubTypes.Type(name = "index_sub", value = SinglePhaseSubTask.class), 
// for backward compatibility
-    @JsonSubTypes.Type(name = PartialSegmentGenerateTask.TYPE, value = 
PartialSegmentGenerateTask.class),
-    @JsonSubTypes.Type(name = PartialSegmentMergeTask.TYPE, value = 
PartialSegmentMergeTask.class),
-    @JsonSubTypes.Type(name = "index_hadoop", value = HadoopIndexTask.class),
-    @JsonSubTypes.Type(name = "index_realtime", value = 
RealtimeIndexTask.class),
-    @JsonSubTypes.Type(name = "index_realtime_appenderator", value = 
AppenderatorDriverRealtimeIndexTask.class),
-    @JsonSubTypes.Type(name = "noop", value = NoopTask.class),
-    @JsonSubTypes.Type(name = "compact", value = CompactionTask.class)
+    @Type(name = "kill", value = KillTask.class),
+    @Type(name = "move", value = MoveTask.class),
+    @Type(name = "archive", value = ArchiveTask.class),
+    @Type(name = "restore", value = RestoreTask.class),
+    @Type(name = "index", value = IndexTask.class),
+    @Type(name = ParallelIndexSupervisorTask.TYPE, value = 
ParallelIndexSupervisorTask.class),
+    @Type(name = SinglePhaseSubTask.TYPE, value = SinglePhaseSubTask.class),
+    @Type(name = SinglePhaseSubTask.OLD_TYPE_NAME, value = 
LegacySinglePhaseSubTask.class), // for backward compatibility
+    @Type(name = PartialSegmentGenerateTask.TYPE, value = 
PartialSegmentGenerateTask.class),
+    @Type(name = PartialSegmentMergeTask.TYPE, value = 
PartialSegmentMergeTask.class),
+    @Type(name = "index_hadoop", value = HadoopIndexTask.class),
+    @Type(name = "index_realtime", value = RealtimeIndexTask.class),
+    @Type(name = "index_realtime_appenderator", value = 
AppenderatorDriverRealtimeIndexTask.class),
+    @Type(name = "noop", value = NoopTask.class),
+    @Type(name = "compact", value = CompactionTask.class)
 })
 public interface Task
 {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
new file mode 100644
index 0000000..4e842f2
--- /dev/null
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/LegacySinglePhaseSubTask.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.client.indexing.IndexingServiceClient;
+import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
+import org.apache.druid.indexing.common.task.TaskResource;
+import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+public class LegacySinglePhaseSubTask extends SinglePhaseSubTask
+{
+  @JsonCreator
+  public LegacySinglePhaseSubTask(
+      @JsonProperty("id") @Nullable final String id,
+      @JsonProperty("groupId") final String groupId,
+      @JsonProperty("resource") final TaskResource taskResource,
+      @JsonProperty("supervisorTaskId") final String supervisorTaskId,
+      @JsonProperty("numAttempts") final int numAttempts, // zero-based 
counting
+      @JsonProperty("spec") final ParallelIndexIngestionSpec ingestionSchema,
+      @JsonProperty("context") final Map<String, Object> context,
+      @JacksonInject IndexingServiceClient indexingServiceClient,
+      @JacksonInject IndexTaskClientFactory<ParallelIndexSupervisorTaskClient> 
taskClientFactory,
+      @JacksonInject AppenderatorsManager appenderatorsManager
+  )
+  {
+    super(
+        id,
+        groupId,
+        taskResource,
+        supervisorTaskId,
+        numAttempts,
+        ingestionSchema,
+        context,
+        indexingServiceClient,
+        taskClientFactory,
+        appenderatorsManager
+    );
+  }
+
+  @Override
+  public String getType()
+  {
+    return SinglePhaseSubTask.OLD_TYPE_NAME;
+  }
+}
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
index 2e4ea8d..a87da9a 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
@@ -35,7 +35,7 @@ public class ParallelIndexIOConfig extends IndexIOConfig
 {
   @JsonCreator
   public ParallelIndexIOConfig(
-      @JsonProperty("firehose") FirehoseFactory firehoseFactory,
+      @JsonProperty("firehose") @Nullable FirehoseFactory firehoseFactory,
       @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting
   )
   {
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
index 9b2d43d..eaa9e2e 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTask.java
@@ -96,6 +96,7 @@ import java.util.stream.Collectors;
 public class SinglePhaseSubTask extends AbstractBatchIndexTask
 {
   public static final String TYPE = "single_phase_sub_task";
+  public static final String OLD_TYPE_NAME = "index_sub";
 
   private static final Logger LOG = new Logger(SinglePhaseSubTask.class);
 
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java
index 8df941c..2afa7ea 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpec.java
@@ -66,4 +66,21 @@ class SinglePhaseSubTaskSpec extends 
SubTaskSpec<SinglePhaseSubTask>
         new DummyForInjectionAppenderatorsManager()
     );
   }
+
+  @Override
+  public SinglePhaseSubTask newSubTaskWithBackwardCompatibleType(int 
numAttempts)
+  {
+    return new LegacySinglePhaseSubTask(
+        null,
+        getGroupId(),
+        null,
+        getSupervisorTaskId(),
+        numAttempts,
+        getIngestionSpec(),
+        getContext(),
+        null,
+        null,
+        new DummyForInjectionAppenderatorsManager()
+    );
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java
index c11552e..f23f260 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SubTaskSpec.java
@@ -80,5 +80,22 @@ public abstract class SubTaskSpec<T extends Task>
     return inputSplit;
   }
 
+  /**
+   * Creates a new task for this SubTaskSpec.
+   */
   public abstract T newSubTask(int numAttempts);
+
+  /**
+   * Creates a new task but with a backward compatible type for this 
SubTaskSpec. This is to support to rolling update
+   * for parallel indexing task and subclasses override this method properly 
if its type name has changed between
+   * releases. See https://github.com/apache/incubator-druid/issues/8836 for 
more details.
+   *
+   * This method will be called if {@link #newSubTask} fails with an {@link 
IllegalStateException} with an error
+   * message starting with "Could not resolve type id". The failure of {@link 
#newSubTask} with this error is NOT
+   * recorded as a failed attempt in {@link TaskHistory}.
+   */
+  public T newSubTaskWithBackwardCompatibleType(int numAttempts)
+  {
+    return newSubTask(numAttempts);
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
index 33cc804..bb5ef75 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitor.java
@@ -220,11 +220,10 @@ public class TaskMonitor<T extends Task>
   {
     synchronized (startStopLock) {
       if (!running) {
-        return Futures.immediateFailedFuture(new ISE("TaskMonitore is not 
running"));
+        return Futures.immediateFailedFuture(new ISE("TaskMonitor is not 
running"));
       }
-      final T task = spec.newSubTask(0);
-      log.info("Submitting a new task[%s] for spec[%s]", task.getId(), 
spec.getId());
-      indexingServiceClient.runTask(task);
+      final T task = submitTask(spec, 0);
+      log.info("Submitted a new task[%s] for spec[%s]", task.getId(), 
spec.getId());
       incrementNumRunningTasks();
 
       final SettableFuture<SubTaskCompleteEvent<T>> taskFuture = 
SettableFuture.create();
@@ -246,9 +245,8 @@ public class TaskMonitor<T extends Task>
     synchronized (startStopLock) {
       if (running) {
         final SubTaskSpec<T> spec = monitorEntry.spec;
-        final T task = spec.newSubTask(monitorEntry.taskHistory.size() + 1);
-        log.info("Submitting a new task[%s] for retrying spec[%s]", 
task.getId(), spec.getId());
-        indexingServiceClient.runTask(task);
+        final T task = submitTask(spec, monitorEntry.taskHistory.size() + 1);
+        log.info("Submitted a new task[%s] for retrying spec[%s]", 
task.getId(), spec.getId());
         incrementNumRunningTasks();
 
         runningTasks.put(
@@ -263,6 +261,38 @@ public class TaskMonitor<T extends Task>
     }
   }
 
+  private T submitTask(SubTaskSpec<T> spec, int numAttempts)
+  {
+    T task = spec.newSubTask(numAttempts);
+    try {
+      indexingServiceClient.runTask(task);
+    }
+    catch (Exception e) {
+      if (isUnknownTypeIdException(e)) {
+        log.warn(e, "Got an unknown type id error. Retrying with a backward 
compatible type.");
+        task = spec.newSubTaskWithBackwardCompatibleType(numAttempts);
+        indexingServiceClient.runTask(task);
+      } else {
+        throw e;
+      }
+    }
+    return task;
+  }
+
+  private boolean isUnknownTypeIdException(Throwable e)
+  {
+    if (e instanceof IllegalStateException) {
+      if (e.getMessage() != null && e.getMessage().contains("Could not resolve 
type id")) {
+        return true;
+      }
+    }
+    if (e.getCause() != null) {
+      return isUnknownTypeIdException(e.getCause());
+    } else {
+      return false;
+    }
+  }
+
   private void incrementNumRunningTasks()
   {
     synchronized (taskCountLock) {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
index 33011f9..175821d 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/TestUtils.java
@@ -28,6 +28,8 @@ import org.apache.druid.client.indexing.IndexingServiceClient;
 import org.apache.druid.client.indexing.NoopIndexingServiceClient;
 import 
org.apache.druid.indexing.common.stats.DropwizardRowIngestionMetersFactory;
 import org.apache.druid.indexing.common.stats.RowIngestionMetersFactory;
+import org.apache.druid.indexing.common.task.IndexTaskClientFactory;
+import org.apache.druid.indexing.common.task.NoopIndexTaskClientFactory;
 import org.apache.druid.indexing.common.task.TestAppenderatorsManager;
 import org.apache.druid.jackson.DefaultObjectMapper;
 import org.apache.druid.java.util.common.ISE;
@@ -92,6 +94,7 @@ public class TestUtils
             .addValue(AuthorizerMapper.class, new 
AuthorizerMapper(ImmutableMap.of()))
             .addValue(AppenderatorsManager.class, new 
TestAppenderatorsManager())
             .addValue(LocalDataSegmentPuller.class, new 
LocalDataSegmentPuller())
+            .addValue(IndexTaskClientFactory.class, new 
NoopIndexTaskClientFactory())
     );
 
     jsonMapper.registerModule(
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopIndexTaskClientFactory.java
similarity index 51%
copy from 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
copy to 
indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopIndexTaskClientFactory.java
index 2e4ea8d..8138f7c 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexIOConfig.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/NoopIndexTaskClientFactory.java
@@ -17,28 +17,23 @@
  * under the License.
  */
 
-package org.apache.druid.indexing.common.task.batch.parallel;
+package org.apache.druid.indexing.common.task;
 
-import com.fasterxml.jackson.annotation.JsonCreator;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.fasterxml.jackson.annotation.JsonTypeName;
-import org.apache.druid.data.input.FirehoseFactory;
-import org.apache.druid.indexing.common.task.IndexTask.IndexIOConfig;
+import org.apache.druid.indexing.common.IndexTaskClient;
+import org.apache.druid.indexing.common.TaskInfoProvider;
+import org.joda.time.Duration;
 
-import javax.annotation.Nullable;
-
-/**
- * Same with {@link IndexIOConfig} except its JSON type name.
- */
-@JsonTypeName("index_parallel")
-public class ParallelIndexIOConfig extends IndexIOConfig
+public class NoopIndexTaskClientFactory implements IndexTaskClientFactory
 {
-  @JsonCreator
-  public ParallelIndexIOConfig(
-      @JsonProperty("firehose") FirehoseFactory firehoseFactory,
-      @JsonProperty("appendToExisting") @Nullable Boolean appendToExisting
+  @Override
+  public IndexTaskClient build(
+      TaskInfoProvider taskInfoProvider,
+      String callerId,
+      int numThreads,
+      Duration httpTimeout,
+      long numRetries
   )
   {
-    super(firehoseFactory, appendToExisting);
+    throw new UnsupportedOperationException();
   }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java
new file mode 100644
index 0000000..e38dcb3
--- /dev/null
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseSubTaskSpecTest.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.InputRowParser;
+import org.apache.druid.data.input.impl.JSONParseSpec;
+import org.apache.druid.data.input.impl.StringInputRowParser;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexing.common.TestUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.query.aggregation.AggregatorFactory;
+import org.apache.druid.segment.indexing.DataSchema;
+import org.apache.druid.segment.realtime.firehose.LocalFirehoseFactory;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+public class SinglePhaseSubTaskSpecTest
+{
+  private static final ObjectMapper MAPPER = new 
TestUtils().getTestObjectMapper();
+
+  private static ParallelIndexIngestionSpec createParallelIndexIngestionSpec() 
throws IOException
+  {
+    final InputRowParser parser = new StringInputRowParser(
+        new JSONParseSpec(
+            new TimestampSpec(null, null, null),
+            new DimensionsSpec(null),
+            null,
+            null
+        ),
+        StringUtils.UTF8_STRING
+    );
+    final Map<String, Object> parserMap = 
MAPPER.readValue(MAPPER.writeValueAsBytes(parser), Map.class);
+    return new ParallelIndexIngestionSpec(
+        new DataSchema(
+            "dataSource",
+            parserMap,
+            new AggregatorFactory[0],
+            null,
+            null,
+            MAPPER
+        ),
+        new ParallelIndexIOConfig(
+            new LocalFirehoseFactory(new File("baseDir"), "filter", null),
+            null
+        ),
+        null
+    );
+  }
+
+  private SinglePhaseSubTaskSpec spec;
+
+  @Before
+  public void setup() throws IOException
+  {
+    spec = new SinglePhaseSubTaskSpec(
+        "id",
+        "groupId",
+        "supervisorTaskId",
+        createParallelIndexIngestionSpec(),
+        null,
+        new InputSplit<>("string split")
+    );
+  }
+
+  @Test
+  public void testNewSubTaskType() throws IOException
+  {
+    final SinglePhaseSubTask expected = spec.newSubTask(0);
+    final byte[] json = MAPPER.writeValueAsBytes(expected);
+    final Map<String, Object> actual = MAPPER.readValue(json, Map.class);
+    Assert.assertEquals(SinglePhaseSubTask.TYPE, actual.get("type"));
+  }
+
+  @Test
+  public void testNewSubTaskWithBackwardCompatibleType() throws IOException
+  {
+    final SinglePhaseSubTask expected = 
spec.newSubTaskWithBackwardCompatibleType(0);
+    final byte[] json = MAPPER.writeValueAsBytes(expected);
+    final Map<String, Object> actual = MAPPER.readValue(json, Map.class);
+    Assert.assertEquals(SinglePhaseSubTask.OLD_TYPE_NAME, actual.get("type"));
+  }
+}
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
index 6043768..07a231e 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/TaskMonitorTest.java
@@ -32,6 +32,7 @@ import org.apache.druid.indexing.common.TaskToolbox;
 import org.apache.druid.indexing.common.task.NoopTask;
 import 
org.apache.druid.indexing.common.task.batch.parallel.TaskMonitor.SubTaskCompleteEvent;
 import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.ISE;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.junit.After;
 import org.junit.Assert;
@@ -77,7 +78,7 @@ public class TaskMonitorTest
     final List<ListenableFuture<SubTaskCompleteEvent<TestTask>>> futures = 
IntStream
         .range(0, 10)
         .mapToObj(i -> monitor.submit(
-            new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, 
new IntegerInputSplit(i), 100L, 0)
+            new TestTaskSpec("specId" + i, "groupId", "supervisorId", null, 
new IntegerInputSplit(i), 100L, 0, false)
         ))
         .collect(Collectors.toList());
     for (int i = 0; i < futures.size(); i++) {
@@ -98,7 +99,16 @@ public class TaskMonitorTest
     final List<TestTaskSpec> specs = IntStream
         .range(0, 10)
         .mapToObj(
-            i -> new TestTaskSpec("specId" + i, "groupId", "supervisorId", 
null, new IntegerInputSplit(i), 100L, 2)
+            i -> new TestTaskSpec(
+                "specId" + i,
+                "groupId",
+                "supervisorId",
+                null,
+                new IntegerInputSplit(i),
+                100L,
+                2,
+                false
+            )
         )
         .collect(Collectors.toList());
     final List<ListenableFuture<SubTaskCompleteEvent<TestTask>>> futures = 
specs
@@ -127,43 +137,97 @@ public class TaskMonitorTest
     }
   }
 
+  @Test
+  public void testResubmitWithOldType() throws InterruptedException, 
ExecutionException, TimeoutException
+  {
+    final List<TestTaskSpec> specs = IntStream
+        .range(0, 10)
+        .mapToObj(
+            i -> new TestTaskSpec(
+                "specId" + i,
+                "groupId",
+                "supervisorId",
+                null,
+                new IntegerInputSplit(i),
+                100L,
+                0,
+                true
+            )
+        )
+        .collect(Collectors.toList());
+    final List<ListenableFuture<SubTaskCompleteEvent<TestTask>>> futures = 
specs
+        .stream()
+        .map(monitor::submit)
+        .collect(Collectors.toList());
+    for (int i = 0; i < futures.size(); i++) {
+      // # of threads of taskRunner is 5, and each task is expected to be run 
3 times (with 2 retries), so the expected
+      // max timeout is 6 sec. We additionally wait 4 more seconds here to 
make sure the test passes.
+      final SubTaskCompleteEvent<TestTask> result = futures.get(i).get(2, 
TimeUnit.SECONDS);
+      Assert.assertEquals("supervisorId", 
result.getSpec().getSupervisorTaskId());
+      Assert.assertEquals("specId" + i, result.getSpec().getId());
+
+      Assert.assertNotNull(result.getLastStatus());
+      Assert.assertEquals(TaskState.SUCCESS, 
result.getLastStatus().getStatusCode());
+      Assert.assertEquals(TaskState.SUCCESS, result.getLastState());
+
+      final TaskHistory<TestTask> taskHistory = 
monitor.getCompleteSubTaskSpecHistory(specs.get(i).getId());
+      Assert.assertNotNull(taskHistory);
+
+      final List<TaskStatusPlus> attemptHistory = 
taskHistory.getAttemptHistory();
+      Assert.assertNotNull(attemptHistory);
+      Assert.assertEquals(1, attemptHistory.size());
+      Assert.assertEquals(TaskState.SUCCESS, 
attemptHistory.get(0).getStatusCode());
+    }
+  }
+
   private static class TestTaskSpec extends SubTaskSpec<TestTask>
   {
     private final long runTime;
     private final int numMaxFails;
+    private final boolean throwUnknownTypeIdError;
 
     private int numFails;
 
-    public TestTaskSpec(
+    TestTaskSpec(
         String id,
         String groupId,
         String supervisorTaskId,
         Map<String, Object> context,
         InputSplit inputSplit,
         long runTime,
-        int numMaxFails
+        int numMaxFails,
+        boolean throwUnknownTypeIdError
     )
     {
       super(id, groupId, supervisorTaskId, context, inputSplit);
       this.runTime = runTime;
       this.numMaxFails = numMaxFails;
+      this.throwUnknownTypeIdError = throwUnknownTypeIdError;
     }
 
     @Override
     public TestTask newSubTask(int numAttempts)
     {
-      return new TestTask(getId(), runTime, numFails++ < numMaxFails);
+      return new TestTask(getId(), runTime, numFails++ < numMaxFails, 
throwUnknownTypeIdError);
+    }
+
+    @Override
+    public TestTask newSubTaskWithBackwardCompatibleType(int numAttempts)
+    {
+      return new TestTask(getId(), runTime, numFails++ < numMaxFails, false);
     }
   }
 
   private static class TestTask extends NoopTask
   {
     private final boolean shouldFail;
+    private final boolean throwUnknownTypeIdError;
 
-    TestTask(String id, long runTime, boolean shouldFail)
+    TestTask(String id, long runTime, boolean shouldFail, boolean 
throwUnknownTypeIdError)
     {
       super(id, null, "testDataSource", runTime, 0, null, null, null);
       this.shouldFail = shouldFail;
+      this.throwUnknownTypeIdError = throwUnknownTypeIdError;
     }
 
     @Override
@@ -185,6 +249,9 @@ public class TaskMonitorTest
     {
       final TestTask task = (TestTask) taskObject;
       tasks.put(task.getId(), TaskState.RUNNING);
+      if (task.throwUnknownTypeIdError) {
+        throw new RuntimeException(new ISE("Could not resolve type id 
'test_task_id'"));
+      }
       taskRunner.submit(() -> tasks.put(task.getId(), 
task.run(null).getStatusCode()));
       return task.getId();
     }
@@ -213,7 +280,7 @@ public class TaskMonitorTest
 
   private static class IntegerInputSplit extends InputSplit<Integer>
   {
-    public IntegerInputSplit(int split)
+    IntegerInputSplit(int split)
     {
       super(split);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to