This is an automated email from the ASF dual-hosted git repository. cwylie pushed a commit to branch 0.14.0-incubating in repository https://gitbox.apache.org/repos/asf/incubator-druid.git
The following commit(s) were added to refs/heads/0.14.0-incubating by this push: new e07537b bugfix: when building materialized-view, if taskCount>1, may cause concurrentModificationException (#6690) (#7165) e07537b is described below commit e07537b845f6f655564e5c8d1ada64da80522177 Author: Jonathan Wei <jon-...@users.noreply.github.com> AuthorDate: Thu Feb 28 18:20:00 2019 -0800 bugfix: when building materialized-view, if taskCount>1, may cause concurrentModificationException (#6690) (#7165) * bugfix: when building materialized-view, if taskCount >1, may cause ConcurrentModificationException * remove entry after iteration instead of using ConcurrentMap, and add unit test * small change * modify unit test for coverage * remove unused method --- .../MaterializedViewSupervisor.java | 15 +++- .../MaterializedViewSupervisorTest.java | 83 ++++++++++++++++++++++ 2 files changed, 96 insertions(+), 2 deletions(-) diff --git a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java index 1ccd812..105afdf 100644 --- a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java +++ b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java @@ -270,13 +270,18 @@ public class MaterializedViewSupervisor implements Supervisor void checkSegmentsAndSubmitTasks() { synchronized (taskLock) { + List<Interval> intervalsToRemove = new ArrayList<>(); for (Map.Entry<Interval, HadoopIndexTask> entry : runningTasks.entrySet()) { Optional<TaskStatus> taskStatus = taskStorage.getStatus(entry.getValue().getId()); if (!taskStatus.isPresent() || !taskStatus.get().isRunnable()) { - runningTasks.remove(entry.getKey()); - runningVersion.remove(entry.getKey()); + intervalsToRemove.add(entry.getKey()); } } + for (Interval interval : intervalsToRemove) { + runningTasks.remove(interval); + runningVersion.remove(interval); + } + if (runningTasks.size() == maxTaskCount) { //if the number of running tasks reach the max task count, supervisor won't submit new tasks. return; @@ -288,6 +293,12 @@ public class MaterializedViewSupervisor implements Supervisor submitTasks(sortedToBuildVersion, baseSegments); } } + + @VisibleForTesting + Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> getRunningTasks() + { + return new Pair<>(runningTasks, runningVersion); + } /** * Find infomation about the intervals in which derived dataSource data should be rebuilt. diff --git a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java index 7b575f0..1bf1c39 100644 --- a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java +++ b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java @@ -27,7 +27,11 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.indexer.HadoopIOConfig; +import org.apache.druid.indexer.HadoopIngestionSpec; import org.apache.druid.indexer.HadoopTuningConfig; +import org.apache.druid.indexer.TaskStatus; +import org.apache.druid.indexing.common.task.HadoopIndexTask; import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator; import org.apache.druid.indexing.overlord.TaskMaster; import org.apache.druid.indexing.overlord.TaskQueue; @@ -41,7 +45,9 @@ import org.apache.druid.metadata.TestDerbyConnector; import org.apache.druid.query.aggregation.AggregatorFactory; import org.apache.druid.query.aggregation.LongSumAggregatorFactory; import org.apache.druid.segment.TestHelper; +import org.apache.druid.segment.indexing.DataSchema; import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider; +import org.apache.druid.segment.transform.TransformSpec; import org.apache.druid.server.security.AuthorizerMapper; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec; @@ -176,6 +182,83 @@ public class MaterializedViewSupervisorTest Assert.assertEquals(expectedSegments, toBuildInterval.rhs); } + @Test + public void testCheckSegmentsAndSubmitTasks() throws IOException + { + Set<DataSegment> baseSegments = Sets.newHashSet( + new DataSegment( + "base", + Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), + "2015-01-03", + ImmutableMap.of(), + ImmutableList.of("dim1", "dim2"), + ImmutableList.of("m1"), + new HashBasedNumberedShardSpec(0, 1, null, null), + 9, + 1024 + ) + ); + indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments); + expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes(); + expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes(); + expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes(); + expect(taskStorage.getStatus("test_task1")).andReturn(Optional.of(TaskStatus.failure("test_task1"))).anyTimes(); + expect(taskStorage.getStatus("test_task2")).andReturn(Optional.of(TaskStatus.running("test_task2"))).anyTimes(); + EasyMock.replay(taskStorage); + + Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> runningTasksPair = supervisor.getRunningTasks(); + Map<Interval, HadoopIndexTask> runningTasks = runningTasksPair.lhs; + Map<Interval, String> runningVersion = runningTasksPair.rhs; + + DataSchema dataSchema = new DataSchema( + "test_datasource", + null, + null, + null, + TransformSpec.NONE, + objectMapper + ); + HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(new HashMap<>(), null, null); + HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, hadoopIOConfig, null); + HadoopIndexTask task1 = new HadoopIndexTask( + "test_task1", + spec, + null, + null, + null, + objectMapper, + null, + null, + null + ); + runningTasks.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), task1); + runningVersion.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), "test_version1"); + + HadoopIndexTask task2 = new HadoopIndexTask( + "test_task2", + spec, + null, + null, + null, + objectMapper, + null, + null, + null + ); + runningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2); + runningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), "test_version2"); + + supervisor.checkSegmentsAndSubmitTasks(); + + Map<Interval, HadoopIndexTask> expectedRunningTasks = new HashMap<>(); + Map<Interval, String> expectedRunningVersion = new HashMap<>(); + expectedRunningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2); + expectedRunningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), "test_version2"); + + Assert.assertEquals(expectedRunningTasks, runningTasks); + Assert.assertEquals(expectedRunningVersion, runningVersion); + + } @Test public void testSuspendedDoesntRun() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org