This is an automated email from the ASF dual-hosted git repository.

jihoonson pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/0.17.0 by this push:
     new c0f4dfb  More tests for range partition parallel indexing (#9232) 
(#9236)
c0f4dfb is described below

commit c0f4dfb538af1e0a0a48fa67dce7e6784d167a6e
Author: Chi Cao Minh <[email protected]>
AuthorDate: Tue Jan 21 15:23:12 2020 -0800

    More tests for range partition parallel indexing (#9232) (#9236)
    
    Add more unit tests for range partition native batch parallel indexing.
    
    Also, fix a bug where ParallelIndexPhaseRunner incorrectly thinks that
    identical collected DimensionDistributionReports are not equal due to
    not overriding equals() in DimensionDistributionReport.
---
 indexing-service/pom.xml                           |  5 ++++
 .../parallel/DimensionDistributionReport.java      | 21 +++++++++++++
 .../batch/parallel/distribution/StringSketch.java  | 35 ++++++++++++++++++++++
 .../AbstractMultiPhaseParallelIndexingTest.java    | 11 ++++---
 .../parallel/DimensionDistributionReportTest.java  |  9 ++++++
 ...ashPartitionMultiPhaseParallelIndexingTest.java |  4 ++-
 ...ngePartitionMultiPhaseParallelIndexingTest.java | 22 +++++++++-----
 .../parallel/distribution/StringSketchTest.java    | 10 +++++++
 pom.xml                                            |  2 +-
 9 files changed, 106 insertions(+), 13 deletions(-)

diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index b6d373a..a1c0f71 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -272,6 +272,11 @@
             <artifactId>assertj-core</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>nl.jqno.equalsverifier</groupId>
+            <artifactId>equalsverifier</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
index a2e6dd0..a7aea29 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
@@ -25,6 +25,7 @@ import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringD
 import org.joda.time.Interval;
 
 import java.util.Map;
+import java.util.Objects;
 
 public class DimensionDistributionReport implements SubTaskReport
 {
@@ -65,4 +66,24 @@ public class DimensionDistributionReport implements 
SubTaskReport
            ", intervalToDistribution=" + intervalToDistribution +
            '}';
   }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    DimensionDistributionReport that = (DimensionDistributionReport) o;
+    return Objects.equals(taskId, that.taskId) &&
+           Objects.equals(intervalToDistribution, that.intervalToDistribution);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(taskId, intervalToDistribution);
+  }
 }
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
index bba16cc..dc4dbe1 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
@@ -37,6 +37,7 @@ import org.apache.datasketches.quantiles.ItemsSketch;
 
 import java.io.IOException;
 import java.util.Comparator;
+import java.util.Objects;
 
 /**
  * Counts approximate frequencies of strings.
@@ -137,6 +138,40 @@ public class StringSketch implements StringDistribution
            '}';
   }
 
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    StringSketch that = (StringSketch) o;
+
+    // ParallelIndexPhaseRunner.collectReport() uses equals() to check 
subtasks send identical reports if they retry.
+    // However, ItemsSketch does not override equals(): 
https://github.com/apache/incubator-datasketches-java/issues/140
+    //
+    // Since ItemsSketch has built-in non-determinism, only rely on 
ItemsSketch properties that are deterministic. This
+    // check is best-effort as it is possible for it to return true for 
sketches that contain different values.
+    return delegate.getK() == that.delegate.getK() &&
+           delegate.getN() == that.delegate.getN() &&
+           Objects.equals(delegate.getMaxValue(), that.delegate.getMaxValue()) 
&&
+           Objects.equals(delegate.getMinValue(), that.delegate.getMinValue());
+  }
+
+  @Override
+  public int hashCode()
+  {
+    // See comment in equals() regarding ItemsSketch.
+    return Objects.hash(
+        delegate.getK(),
+        delegate.getN(),
+        delegate.getMaxValue(),
+        delegate.getMinValue()
+    );
+  }
+
   ItemsSketch<String> getDelegate()
   {
     return delegate;
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index 880d806..a7c4396 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -107,7 +107,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest 
extends AbstractParallelIn
       Interval interval,
       File inputDir,
       String filter,
-      DimensionBasedPartitionsSpec partitionsSpec
+      DimensionBasedPartitionsSpec partitionsSpec,
+      int maxNumConcurrentSubTasks
   ) throws Exception
   {
     final ParallelIndexSupervisorTask task = newTask(
@@ -115,7 +116,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest 
extends AbstractParallelIn
         interval,
         inputDir,
         filter,
-        partitionsSpec
+        partitionsSpec,
+        maxNumConcurrentSubTasks
     );
 
     actionClient = createActionClient(task);
@@ -137,7 +139,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest 
extends AbstractParallelIn
       Interval interval,
       File inputDir,
       String filter,
-      DimensionBasedPartitionsSpec partitionsSpec
+      DimensionBasedPartitionsSpec partitionsSpec,
+      int maxNumConcurrentSubTasks
   )
   {
     GranularitySpec granularitySpec = new UniformGranularitySpec(
@@ -163,7 +166,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest 
extends AbstractParallelIn
         null,
         null,
         null,
-        2,
+        maxNumConcurrentSubTasks,
         null,
         null,
         null,
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
index c23362f..e82041b 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common.task.batch.parallel;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
 import 
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
 import org.apache.druid.java.util.common.Intervals;
@@ -52,4 +53,12 @@ public class DimensionDistributionReportTest
   {
     TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
   }
+
+  @Test
+  public void abidesEqualsContract()
+  {
+    EqualsVerifier.forClass(DimensionDistributionReport.class)
+                  .usingGetClass()
+                  .verify();
+  }
 }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
index dffd9d5..7219f16 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
@@ -77,6 +77,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiPh
       false,
       0
   );
+  private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;
 
   @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
   public static Iterable<Object[]> constructorFeeder()
@@ -129,7 +130,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiPh
         Intervals.of("2017/2018"),
         inputDir,
         "test_*",
-        new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2"))
+        new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
+        MAX_NUM_CONCURRENT_SUB_TASKS
     );
     assertHashedPartition(publishedSegments);
   }
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index 94ccf5c..53cb759 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -100,22 +100,30 @@ public class RangePartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiP
       0
   );
 
-  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
+  @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}, 
maxNumConcurrentSubTasks={2}")
   public static Iterable<Object[]> constructorFeeder()
   {
     return ImmutableList.of(
-        new Object[]{LockGranularity.TIME_CHUNK, false},
-        new Object[]{LockGranularity.TIME_CHUNK, true},
-        new Object[]{LockGranularity.SEGMENT, true}
+        new Object[]{LockGranularity.TIME_CHUNK, false, 2},
+        new Object[]{LockGranularity.TIME_CHUNK, true, 2},
+        new Object[]{LockGranularity.SEGMENT, true, 2},
+        new Object[]{LockGranularity.SEGMENT, true, 1}  // currently spawns 
subtask instead of running in supervisor
     );
   }
 
   private File inputDir;
   private SetMultimap<Interval, String> intervalToDim1;
 
-  public RangePartitionMultiPhaseParallelIndexingTest(LockGranularity 
lockGranularity, boolean useInputFormatApi)
+  private final int maxNumConcurrentSubTasks;
+
+  public RangePartitionMultiPhaseParallelIndexingTest(
+      LockGranularity lockGranularity,
+      boolean useInputFormatApi,
+      int maxNumConcurrentSubTasks
+  )
   {
     super(lockGranularity, useInputFormatApi);
+    this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
   }
 
   @Override
@@ -169,7 +177,8 @@ public class RangePartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiP
             null,
             DIM1,
             false
-        )
+        ),
+        maxNumConcurrentSubTasks
     );
     assertRangePartitions(publishedSegments);
   }
@@ -362,7 +371,6 @@ public class RangePartitionMultiPhaseParallelIndexingTest 
extends AbstractMultiP
     }
   }
 
-
   private static class TestPartialGenericSegmentMergeParallelIndexTaskRunner
       extends PartialGenericSegmentMergeParallelIndexTaskRunner
   {
diff --git 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
index b09634d..d82e212 100644
--- 
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
+++ 
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
@@ -20,6 +20,7 @@
 package org.apache.druid.indexing.common.task.batch.parallel.distribution;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
 import org.apache.datasketches.quantiles.ItemsSketch;
 import org.apache.druid.jackson.JacksonModule;
 import org.apache.druid.java.util.common.StringUtils;
@@ -69,6 +70,15 @@ public class StringSketchTest
       target.put(MAX_STRING);
       TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
     }
+
+    @Test
+    public void abidesEqualsContract()
+    {
+      EqualsVerifier.forClass(StringSketch.class)
+                    .usingGetClass()
+                    .withNonnullFields("delegate")
+                    .verify();
+    }
   }
 
   public static class PutTest
diff --git a/pom.xml b/pom.xml
index e06bca7..93664fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1201,7 +1201,7 @@
             <dependency>
                 <groupId>nl.jqno.equalsverifier</groupId>
                 <artifactId>equalsverifier</artifactId>
-                <version>3.1.10</version>
+                <version>3.1.11</version>
                 <scope>test</scope>
             </dependency>
             <dependency>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to