This is an automated email from the ASF dual-hosted git repository.

maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 06bae29  Fix ingestion task failure when no input split to process 
(#11553)
06bae29 is described below

commit 06bae29979eed20ccc4a75374a5259af02c71014
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Mon Aug 9 23:11:08 2021 +0700

    Fix ingestion task failure when no input split to process (#11553)
    
    * fix ingestion task failure when no input split to process
    
    * add IT
    
    * fix IT
---
 .../batch/parallel/ParallelIndexPhaseRunner.java   |  6 +++-
 .../parallel/SinglePhaseParallelIndexingTest.java  | 32 ++++++++++++++++++----
 .../tests/indexer/AbstractITBatchIndexTest.java    |  2 +-
 .../apache/druid/tests/indexer/ITIndexerTest.java  | 29 ++++++++++++++++++++
 4 files changed, 62 insertions(+), 7 deletions(-)

diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
index b34241a..890cb90 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
@@ -289,13 +289,17 @@ public abstract class 
ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
   @Override
   public void collectReport(SubTaskReportType report)
   {
+    // This method is only called when there is a subtask sending its report.
+    // Since TaskMonitor is responsible for spawning subtasks, the taskMonitor 
cannot be null if we have subtask sending report
+    // This null check is to ensure that the contract mentioned above is not 
broken
+    assert taskMonitor != null;
     taskMonitor.collectReport(report);
   }
 
   @Override
   public Map<String, SubTaskReportType> getReports()
   {
-    return taskMonitor.getReports();
+    return taskMonitor == null ? Collections.emptyMap() : 
taskMonitor.getReports();
   }
 
   @Override
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 7b9ac3d..f04559f 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -81,6 +81,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
   }
 
   private static final Interval INTERVAL_TO_INDEX = 
Intervals.of("2017-12/P1M");
+  private static final String VALID_INPUT_SOURCE_FILTER = "test_*";
 
   private final LockGranularity lockGranularity;
   private final boolean useInputFormatApi;
@@ -368,7 +369,8 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
             null,
             null,
             null
-        )
+        ),
+        VALID_INPUT_SOURCE_FILTER
     );
     task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == 
LockGranularity.TIME_CHUNK);
     Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
@@ -394,6 +396,24 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
   }
 
   @Test
+  public void testRunParallelWithNoInputSplitToProcess()
+  {
+    // The input source filter on this task does not match any input
+    // Hence, the this task will has no input split to process
+    final ParallelIndexSupervisorTask task = newTask(
+        Intervals.of("2017-12/P1M"),
+        Granularities.DAY,
+        true,
+        true,
+        
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
+        "non_existing_file_filter"
+    );
+    task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity == 
LockGranularity.TIME_CHUNK);
+    // Task state should still be SUCCESS even if no input split to process
+    Assert.assertEquals(TaskState.SUCCESS, 
getIndexingServiceClient().runAndWait(task).getStatusCode());
+  }
+
+  @Test
   public void testOverwriteAndAppend()
   {
     final Interval interval = Intervals.of("2017-12/P1M");
@@ -437,7 +457,8 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
         segmentGranularity,
         appendToExisting,
         splittableInputSource,
-        
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING
+        
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
+        VALID_INPUT_SOURCE_FILTER
     );
   }
 
@@ -446,7 +467,8 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
       Granularity segmentGranularity,
       boolean appendToExisting,
       boolean splittableInputSource,
-      ParallelIndexTuningConfig tuningConfig
+      ParallelIndexTuningConfig tuningConfig,
+      String inputSourceFilter
   )
   {
     // set up ingestion spec
@@ -469,7 +491,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
           ),
           new ParallelIndexIOConfig(
               null,
-              new SettableSplittableLocalInputSource(inputDir, "test_*", 
splittableInputSource),
+              new SettableSplittableLocalInputSource(inputDir, 
inputSourceFilter, splittableInputSource),
               DEFAULT_INPUT_FORMAT,
               appendToExisting,
               null
@@ -499,7 +521,7 @@ public class SinglePhaseParallelIndexingTest extends 
AbstractParallelIndexSuperv
               getObjectMapper()
           ),
           new ParallelIndexIOConfig(
-              new LocalFirehoseFactory(inputDir, "test_*", null),
+              new LocalFirehoseFactory(inputDir, inputSourceFilter, null),
               appendToExisting
           ),
           tuningConfig
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index acab50d..6dca729 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -302,7 +302,7 @@ public abstract class AbstractITBatchIndexTest extends 
AbstractIndexerTest
     }
   }
 
-  private void submitTaskAndWait(
+  protected void submitTaskAndWait(
       String taskSpec,
       String dataSourceName,
       boolean waitForNewVersion,
diff --git 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index ec9c753..833a0ac 100644
--- 
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++ 
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -170,6 +170,35 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
   }
 
   @Test
+  public void testReIndexWithNonExistingDatasource() throws Exception
+  {
+    Pair<Boolean, Boolean> dummyPair = new Pair<>(false, false);
+    final String fullBaseDatasourceName = "nonExistingDatasource2904";
+    final String fullReindexDatasourceName = "newDatasource123";
+
+    String taskSpec = StringUtils.replace(
+        getResourceAsString(REINDEX_TASK_WITH_DRUID_INPUT_SOURCE),
+        "%%DATASOURCE%%",
+        fullBaseDatasourceName
+    );
+    taskSpec = StringUtils.replace(
+        taskSpec,
+        "%%REINDEX_DATASOURCE%%",
+        fullReindexDatasourceName
+    );
+
+    // This method will also verify task is successful after task finish 
running
+    // We expect task to be successful even if the datasource to reindex does 
not exist
+    submitTaskAndWait(
+        taskSpec,
+        fullReindexDatasourceName,
+        false,
+        false,
+        dummyPair
+    );
+  }
+
+  @Test
   public void testMERGEIndexData() throws Exception
   {
     final String reindexDatasource = MERGE_REINDEX_DATASOURCE + 
"-testMergeIndexData";

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to