This is an automated email from the ASF dual-hosted git repository.

cwylie pushed a commit to branch 0.14.0-incubating
in repository https://gitbox.apache.org/repos/asf/incubator-druid.git


The following commit(s) were added to refs/heads/0.14.0-incubating by this push:
     new e07537b  bugfix: when building materialized-view, if taskCount>1, may 
cause concurrentModificationException (#6690) (#7165)
e07537b is described below

commit e07537b845f6f655564e5c8d1ada64da80522177
Author: Jonathan Wei <jon-...@users.noreply.github.com>
AuthorDate: Thu Feb 28 18:20:00 2019 -0800

    bugfix: when building materialized-view, if taskCount>1, may cause 
concurrentModificationException (#6690) (#7165)
    
    * bugfix: when building materialized-view, if taskCount >1, may cause 
ConcurrentModificationException
    
    * remove entry after iteration instead of using ConcurrentMap, and add unit 
test
    
    * small change
    
    * modify unit test for coverage
    
    * remove unused method
---
 .../MaterializedViewSupervisor.java                | 15 +++-
 .../MaterializedViewSupervisorTest.java            | 83 ++++++++++++++++++++++
 2 files changed, 96 insertions(+), 2 deletions(-)

diff --git 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
index 1ccd812..105afdf 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/main/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisor.java
@@ -270,13 +270,18 @@ public class MaterializedViewSupervisor implements 
Supervisor
   void checkSegmentsAndSubmitTasks()
   {
     synchronized (taskLock) {
+      List<Interval> intervalsToRemove = new ArrayList<>();
       for (Map.Entry<Interval, HadoopIndexTask> entry : 
runningTasks.entrySet()) {
         Optional<TaskStatus> taskStatus = 
taskStorage.getStatus(entry.getValue().getId());
         if (!taskStatus.isPresent() || !taskStatus.get().isRunnable()) {
-          runningTasks.remove(entry.getKey());
-          runningVersion.remove(entry.getKey());
+          intervalsToRemove.add(entry.getKey());
         }
       }
+      for (Interval interval : intervalsToRemove) {
+        runningTasks.remove(interval);
+        runningVersion.remove(interval);
+      }
+
       if (runningTasks.size() == maxTaskCount) {
         //if the number of running tasks reach the max task count, supervisor 
won't submit new tasks.
         return;
@@ -288,6 +293,12 @@ public class MaterializedViewSupervisor implements 
Supervisor
       submitTasks(sortedToBuildVersion, baseSegments);
     }
   }
+  
+  @VisibleForTesting
+  Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> getRunningTasks()
+  {
+    return new Pair<>(runningTasks, runningVersion);
+  }
 
   /**
    * Find infomation about the intervals in which derived dataSource data 
should be rebuilt.
diff --git 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
index 7b575f0..1bf1c39 100644
--- 
a/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
+++ 
b/extensions-contrib/materialized-view-maintenance/src/test/java/org/apache/druid/indexing/materializedview/MaterializedViewSupervisorTest.java
@@ -27,7 +27,11 @@ import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
 import org.apache.druid.data.input.impl.DimensionsSpec;
 import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.indexer.HadoopIOConfig;
+import org.apache.druid.indexer.HadoopIngestionSpec;
 import org.apache.druid.indexer.HadoopTuningConfig;
+import org.apache.druid.indexer.TaskStatus;
+import org.apache.druid.indexing.common.task.HadoopIndexTask;
 import org.apache.druid.indexing.overlord.IndexerMetadataStorageCoordinator;
 import org.apache.druid.indexing.overlord.TaskMaster;
 import org.apache.druid.indexing.overlord.TaskQueue;
@@ -41,7 +45,9 @@ import org.apache.druid.metadata.TestDerbyConnector;
 import org.apache.druid.query.aggregation.AggregatorFactory;
 import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
 import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.indexing.DataSchema;
 import org.apache.druid.segment.realtime.firehose.ChatHandlerProvider;
+import org.apache.druid.segment.transform.TransformSpec;
 import org.apache.druid.server.security.AuthorizerMapper;
 import org.apache.druid.timeline.DataSegment;
 import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
@@ -176,6 +182,83 @@ public class MaterializedViewSupervisorTest
     Assert.assertEquals(expectedSegments, toBuildInterval.rhs);
   }
 
+  @Test
+  public void testCheckSegmentsAndSubmitTasks() throws IOException
+  {
+    Set<DataSegment> baseSegments = Sets.newHashSet(
+        new DataSegment(
+            "base",
+            Intervals.of("2015-01-02T00Z/2015-01-03T00Z"),
+            "2015-01-03",
+            ImmutableMap.of(),
+            ImmutableList.of("dim1", "dim2"),
+            ImmutableList.of("m1"),
+            new HashBasedNumberedShardSpec(0, 1, null, null),
+            9,
+            1024
+        )
+    );
+    indexerMetadataStorageCoordinator.announceHistoricalSegments(baseSegments);
+    
expect(taskMaster.getTaskQueue()).andReturn(Optional.of(taskQueue)).anyTimes();
+    expect(taskMaster.getTaskRunner()).andReturn(Optional.absent()).anyTimes();
+    
expect(taskStorage.getActiveTasks()).andReturn(ImmutableList.of()).anyTimes();
+    
expect(taskStorage.getStatus("test_task1")).andReturn(Optional.of(TaskStatus.failure("test_task1"))).anyTimes();
+    
expect(taskStorage.getStatus("test_task2")).andReturn(Optional.of(TaskStatus.running("test_task2"))).anyTimes();
+    EasyMock.replay(taskStorage);
+
+    Pair<Map<Interval, HadoopIndexTask>, Map<Interval, String>> 
runningTasksPair = supervisor.getRunningTasks();
+    Map<Interval, HadoopIndexTask> runningTasks = runningTasksPair.lhs;
+    Map<Interval, String> runningVersion = runningTasksPair.rhs;
+
+    DataSchema dataSchema = new DataSchema(
+        "test_datasource",
+        null,
+        null,
+        null,
+        TransformSpec.NONE,
+        objectMapper
+    );
+    HadoopIOConfig hadoopIOConfig = new HadoopIOConfig(new HashMap<>(), null, 
null);
+    HadoopIngestionSpec spec = new HadoopIngestionSpec(dataSchema, 
hadoopIOConfig, null);
+    HadoopIndexTask task1 = new HadoopIndexTask(
+        "test_task1",
+        spec,
+        null,
+        null,
+        null,
+        objectMapper,
+        null,
+        null,
+        null
+    );
+    runningTasks.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), task1);
+    runningVersion.put(Intervals.of("2015-01-01T00Z/2015-01-02T00Z"), 
"test_version1");
+
+    HadoopIndexTask task2 = new HadoopIndexTask(
+        "test_task2",
+        spec,
+        null,
+        null,
+        null,
+        objectMapper,
+        null,
+        null,
+        null
+    );
+    runningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), task2);
+    runningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), 
"test_version2");
+
+    supervisor.checkSegmentsAndSubmitTasks();
+
+    Map<Interval, HadoopIndexTask> expectedRunningTasks = new HashMap<>();
+    Map<Interval, String> expectedRunningVersion = new HashMap<>();
+    expectedRunningTasks.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), 
task2);
+    expectedRunningVersion.put(Intervals.of("2015-01-02T00Z/2015-01-03T00Z"), 
"test_version2");
+
+    Assert.assertEquals(expectedRunningTasks, runningTasks);
+    Assert.assertEquals(expectedRunningVersion, runningVersion);
+
+  }
 
   @Test
   public void testSuspendedDoesntRun()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to