This is an automated email from the ASF dual-hosted git repository.
maytasm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 06bae29 Fix ingestion task failure when no input split to process
(#11553)
06bae29 is described below
commit 06bae29979eed20ccc4a75374a5259af02c71014
Author: Maytas Monsereenusorn <[email protected]>
AuthorDate: Mon Aug 9 23:11:08 2021 +0700
Fix ingestion task failure when no input split to process (#11553)
* fix ingestion task failure when no input split to process
* add IT
* fix IT
---
.../batch/parallel/ParallelIndexPhaseRunner.java | 6 +++-
.../parallel/SinglePhaseParallelIndexingTest.java | 32 ++++++++++++++++++----
.../tests/indexer/AbstractITBatchIndexTest.java | 2 +-
.../apache/druid/tests/indexer/ITIndexerTest.java | 29 ++++++++++++++++++++
4 files changed, 62 insertions(+), 7 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
index b34241a..890cb90 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexPhaseRunner.java
@@ -289,13 +289,17 @@ public abstract class
ParallelIndexPhaseRunner<SubTaskType extends Task, SubTask
@Override
public void collectReport(SubTaskReportType report)
{
+ // This method is only called when there is a subtask sending its report.
+ // Since TaskMonitor is responsible for spawning subtasks, the taskMonitor
cannot be null if we have subtask sending report
+ // This null check is to ensure that the contract mentioned above is not
broken
+ assert taskMonitor != null;
taskMonitor.collectReport(report);
}
@Override
public Map<String, SubTaskReportType> getReports()
{
- return taskMonitor.getReports();
+ return taskMonitor == null ? Collections.emptyMap() :
taskMonitor.getReports();
}
@Override
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
index 7b9ac3d..f04559f 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexingTest.java
@@ -81,6 +81,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
}
private static final Interval INTERVAL_TO_INDEX =
Intervals.of("2017-12/P1M");
+ private static final String VALID_INPUT_SOURCE_FILTER = "test_*";
private final LockGranularity lockGranularity;
private final boolean useInputFormatApi;
@@ -368,7 +369,8 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
null,
null,
null
- )
+ ),
+ VALID_INPUT_SOURCE_FILTER
);
task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity ==
LockGranularity.TIME_CHUNK);
Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
@@ -394,6 +396,24 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
}
@Test
+ public void testRunParallelWithNoInputSplitToProcess()
+ {
+ // The input source filter on this task does not match any input
+ // Hence, the this task will has no input split to process
+ final ParallelIndexSupervisorTask task = newTask(
+ Intervals.of("2017-12/P1M"),
+ Granularities.DAY,
+ true,
+ true,
+
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
+ "non_existing_file_filter"
+ );
+ task.addToContext(Tasks.FORCE_TIME_CHUNK_LOCK_KEY, lockGranularity ==
LockGranularity.TIME_CHUNK);
+ // Task state should still be SUCCESS even if no input split to process
+ Assert.assertEquals(TaskState.SUCCESS,
getIndexingServiceClient().runAndWait(task).getStatusCode());
+ }
+
+ @Test
public void testOverwriteAndAppend()
{
final Interval interval = Intervals.of("2017-12/P1M");
@@ -437,7 +457,8 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
segmentGranularity,
appendToExisting,
splittableInputSource,
-
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING
+
AbstractParallelIndexSupervisorTaskTest.DEFAULT_TUNING_CONFIG_FOR_PARALLEL_INDEXING,
+ VALID_INPUT_SOURCE_FILTER
);
}
@@ -446,7 +467,8 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
Granularity segmentGranularity,
boolean appendToExisting,
boolean splittableInputSource,
- ParallelIndexTuningConfig tuningConfig
+ ParallelIndexTuningConfig tuningConfig,
+ String inputSourceFilter
)
{
// set up ingestion spec
@@ -469,7 +491,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
),
new ParallelIndexIOConfig(
null,
- new SettableSplittableLocalInputSource(inputDir, "test_*",
splittableInputSource),
+ new SettableSplittableLocalInputSource(inputDir,
inputSourceFilter, splittableInputSource),
DEFAULT_INPUT_FORMAT,
appendToExisting,
null
@@ -499,7 +521,7 @@ public class SinglePhaseParallelIndexingTest extends
AbstractParallelIndexSuperv
getObjectMapper()
),
new ParallelIndexIOConfig(
- new LocalFirehoseFactory(inputDir, "test_*", null),
+ new LocalFirehoseFactory(inputDir, inputSourceFilter, null),
appendToExisting
),
tuningConfig
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
index acab50d..6dca729 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/AbstractITBatchIndexTest.java
@@ -302,7 +302,7 @@ public abstract class AbstractITBatchIndexTest extends
AbstractIndexerTest
}
}
- private void submitTaskAndWait(
+ protected void submitTaskAndWait(
String taskSpec,
String dataSourceName,
boolean waitForNewVersion,
diff --git
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
index ec9c753..833a0ac 100644
---
a/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
+++
b/integration-tests/src/test/java/org/apache/druid/tests/indexer/ITIndexerTest.java
@@ -170,6 +170,35 @@ public class ITIndexerTest extends AbstractITBatchIndexTest
}
@Test
+ public void testReIndexWithNonExistingDatasource() throws Exception
+ {
+ Pair<Boolean, Boolean> dummyPair = new Pair<>(false, false);
+ final String fullBaseDatasourceName = "nonExistingDatasource2904";
+ final String fullReindexDatasourceName = "newDatasource123";
+
+ String taskSpec = StringUtils.replace(
+ getResourceAsString(REINDEX_TASK_WITH_DRUID_INPUT_SOURCE),
+ "%%DATASOURCE%%",
+ fullBaseDatasourceName
+ );
+ taskSpec = StringUtils.replace(
+ taskSpec,
+ "%%REINDEX_DATASOURCE%%",
+ fullReindexDatasourceName
+ );
+
+ // This method will also verify task is successful after task finish
running
+ // We expect task to be successful even if the datasource to reindex does
not exist
+ submitTaskAndWait(
+ taskSpec,
+ fullReindexDatasourceName,
+ false,
+ false,
+ dummyPair
+ );
+ }
+
+ @Test
public void testMERGEIndexData() throws Exception
{
final String reindexDatasource = MERGE_REINDEX_DATASOURCE +
"-testMergeIndexData";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]