This is an automated email from the ASF dual-hosted git repository.
surekha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/master by this push:
new c4716d1 Fix ParallelIndexTask when publishing empty segments (#6807)
c4716d1 is described below
commit c4716d1639b89b6a7885732b712e6603b768f886
Author: Jihoon Son <[email protected]>
AuthorDate: Tue Jan 8 17:15:16 2019 -0800
Fix ParallelIndexTask when publishing empty segments (#6807)
* Fix ParallelIndexTask when publishing empty segments
* unused import
---
.../parallel/SinglePhaseParallelIndexTaskRunner.java | 5 +++--
.../parallel/ParallelIndexSupervisorTaskTest.java | 18 ++++++++++++++++++
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
index a6b6951..e41a1a1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/SinglePhaseParallelIndexTaskRunner.java
@@ -390,10 +390,11 @@ public class SinglePhaseParallelIndexTaskRunner
implements ParallelIndexTaskRunn
.stream()
.flatMap(report -> report.getSegments().stream())
.collect(Collectors.toSet());
- final boolean published = publisher.publishSegments(segmentsToPublish,
null).isSuccess();
+ final boolean published = segmentsToPublish.isEmpty()
+ || publisher.publishSegments(segmentsToPublish,
null).isSuccess();
if (published) {
- log.info("Published segments");
+ log.info("Published [%d] segments", segmentsToPublish.size());
} else {
log.info("Transaction failure while publishing segments, checking if
someone else beat us to it.");
final Set<SegmentIdentifier> segmentsIdentifiers = segmentsMap
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
index ba5254d..2f35901 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/ParallelIndexSupervisorTaskTest.java
@@ -187,6 +187,24 @@ public class ParallelIndexSupervisorTaskTest extends
AbstractParallelIndexSuperv
Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
}
+ @Test
+ public void testPublishEmptySegments() throws Exception
+ {
+ final ParallelIndexSupervisorTask task = newTask(
+ Intervals.of("2020/2021"),
+ new ParallelIndexIOConfig(
+ new LocalFirehoseFactory(inputDir, "test_*", null),
+ false
+ )
+ );
+ actionClient = createActionClient(task);
+ toolbox = createTaskToolbox(task);
+
+ prepareTaskForLocking(task);
+ Assert.assertTrue(task.isReady(actionClient));
+ Assert.assertEquals(TaskState.SUCCESS, task.run(toolbox).getStatusCode());
+ }
+
private ParallelIndexSupervisorTask newTask(
Interval interval,
ParallelIndexIOConfig ioConfig
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]