This is an automated email from the ASF dual-hosted git repository.
jihoonson pushed a commit to branch 0.17.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/0.17.0 by this push:
new c0f4dfb More tests for range partition parallel indexing (#9232)
(#9236)
c0f4dfb is described below
commit c0f4dfb538af1e0a0a48fa67dce7e6784d167a6e
Author: Chi Cao Minh <[email protected]>
AuthorDate: Tue Jan 21 15:23:12 2020 -0800
More tests for range partition parallel indexing (#9232) (#9236)
Add more unit tests for range partition native batch parallel indexing.
Also, fix a bug where ParallelIndexPhaseRunner incorrectly thinks that
identical collected DimensionDistributionReports are not equal due to
not overriding equals() in DimensionDistributionReport.
---
indexing-service/pom.xml | 5 ++++
.../parallel/DimensionDistributionReport.java | 21 +++++++++++++
.../batch/parallel/distribution/StringSketch.java | 35 ++++++++++++++++++++++
.../AbstractMultiPhaseParallelIndexingTest.java | 11 ++++---
.../parallel/DimensionDistributionReportTest.java | 9 ++++++
...ashPartitionMultiPhaseParallelIndexingTest.java | 4 ++-
...ngePartitionMultiPhaseParallelIndexingTest.java | 22 +++++++++-----
.../parallel/distribution/StringSketchTest.java | 10 +++++++
pom.xml | 2 +-
9 files changed, 106 insertions(+), 13 deletions(-)
diff --git a/indexing-service/pom.xml b/indexing-service/pom.xml
index b6d373a..a1c0f71 100644
--- a/indexing-service/pom.xml
+++ b/indexing-service/pom.xml
@@ -272,6 +272,11 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>nl.jqno.equalsverifier</groupId>
+ <artifactId>equalsverifier</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
index a2e6dd0..a7aea29 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReport.java
@@ -25,6 +25,7 @@ import
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringD
import org.joda.time.Interval;
import java.util.Map;
+import java.util.Objects;
public class DimensionDistributionReport implements SubTaskReport
{
@@ -65,4 +66,24 @@ public class DimensionDistributionReport implements
SubTaskReport
", intervalToDistribution=" + intervalToDistribution +
'}';
}
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ DimensionDistributionReport that = (DimensionDistributionReport) o;
+ return Objects.equals(taskId, that.taskId) &&
+ Objects.equals(intervalToDistribution, that.intervalToDistribution);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(taskId, intervalToDistribution);
+ }
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
index bba16cc..dc4dbe1 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketch.java
@@ -37,6 +37,7 @@ import org.apache.datasketches.quantiles.ItemsSketch;
import java.io.IOException;
import java.util.Comparator;
+import java.util.Objects;
/**
* Counts approximate frequencies of strings.
@@ -137,6 +138,40 @@ public class StringSketch implements StringDistribution
'}';
}
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StringSketch that = (StringSketch) o;
+
+ // ParallelIndexPhaseRunner.collectReport() uses equals() to check
subtasks send identical reports if they retry.
+ // However, ItemsSketch does not override equals():
https://github.com/apache/incubator-datasketches-java/issues/140
+ //
+ // Since ItemsSketch has built-in non-determinism, only rely on
ItemsSketch properties that are deterministic. This
+ // check is best-effort as it is possible for it to return true for
sketches that contain different values.
+ return delegate.getK() == that.delegate.getK() &&
+ delegate.getN() == that.delegate.getN() &&
+ Objects.equals(delegate.getMaxValue(), that.delegate.getMaxValue())
&&
+ Objects.equals(delegate.getMinValue(), that.delegate.getMinValue());
+ }
+
+ @Override
+ public int hashCode()
+ {
+ // See comment in equals() regarding ItemsSketch.
+ return Objects.hash(
+ delegate.getK(),
+ delegate.getN(),
+ delegate.getMaxValue(),
+ delegate.getMinValue()
+ );
+ }
+
ItemsSketch<String> getDelegate()
{
return delegate;
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
index 880d806..a7c4396 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/AbstractMultiPhaseParallelIndexingTest.java
@@ -107,7 +107,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest
extends AbstractParallelIn
Interval interval,
File inputDir,
String filter,
- DimensionBasedPartitionsSpec partitionsSpec
+ DimensionBasedPartitionsSpec partitionsSpec,
+ int maxNumConcurrentSubTasks
) throws Exception
{
final ParallelIndexSupervisorTask task = newTask(
@@ -115,7 +116,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest
extends AbstractParallelIn
interval,
inputDir,
filter,
- partitionsSpec
+ partitionsSpec,
+ maxNumConcurrentSubTasks
);
actionClient = createActionClient(task);
@@ -137,7 +139,8 @@ abstract class AbstractMultiPhaseParallelIndexingTest
extends AbstractParallelIn
Interval interval,
File inputDir,
String filter,
- DimensionBasedPartitionsSpec partitionsSpec
+ DimensionBasedPartitionsSpec partitionsSpec,
+ int maxNumConcurrentSubTasks
)
{
GranularitySpec granularitySpec = new UniformGranularitySpec(
@@ -163,7 +166,7 @@ abstract class AbstractMultiPhaseParallelIndexingTest
extends AbstractParallelIn
null,
null,
null,
- 2,
+ maxNumConcurrentSubTasks,
null,
null,
null,
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
index c23362f..e82041b 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/DimensionDistributionReportTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel;
import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
import
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringDistribution;
import
org.apache.druid.indexing.common.task.batch.parallel.distribution.StringSketch;
import org.apache.druid.java.util.common.Intervals;
@@ -52,4 +53,12 @@ public class DimensionDistributionReportTest
{
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
}
+
+ @Test
+ public void abidesEqualsContract()
+ {
+ EqualsVerifier.forClass(DimensionDistributionReport.class)
+ .usingGetClass()
+ .verify();
+ }
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
index dffd9d5..7219f16 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/HashPartitionMultiPhaseParallelIndexingTest.java
@@ -77,6 +77,7 @@ public class HashPartitionMultiPhaseParallelIndexingTest
extends AbstractMultiPh
false,
0
);
+ private static final int MAX_NUM_CONCURRENT_SUB_TASKS = 2;
@Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
public static Iterable<Object[]> constructorFeeder()
@@ -129,7 +130,8 @@ public class HashPartitionMultiPhaseParallelIndexingTest
extends AbstractMultiPh
Intervals.of("2017/2018"),
inputDir,
"test_*",
- new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2"))
+ new HashedPartitionsSpec(null, 2, ImmutableList.of("dim1", "dim2")),
+ MAX_NUM_CONCURRENT_SUB_TASKS
);
assertHashedPartition(publishedSegments);
}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
index 94ccf5c..53cb759 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/RangePartitionMultiPhaseParallelIndexingTest.java
@@ -100,22 +100,30 @@ public class RangePartitionMultiPhaseParallelIndexingTest
extends AbstractMultiP
0
);
- @Parameterized.Parameters(name = "{0}, useInputFormatApi={1}")
+ @Parameterized.Parameters(name = "{0}, useInputFormatApi={1},
maxNumConcurrentSubTasks={2}")
public static Iterable<Object[]> constructorFeeder()
{
return ImmutableList.of(
- new Object[]{LockGranularity.TIME_CHUNK, false},
- new Object[]{LockGranularity.TIME_CHUNK, true},
- new Object[]{LockGranularity.SEGMENT, true}
+ new Object[]{LockGranularity.TIME_CHUNK, false, 2},
+ new Object[]{LockGranularity.TIME_CHUNK, true, 2},
+ new Object[]{LockGranularity.SEGMENT, true, 2},
+ new Object[]{LockGranularity.SEGMENT, true, 1} // currently spawns
subtask instead of running in supervisor
);
}
private File inputDir;
private SetMultimap<Interval, String> intervalToDim1;
- public RangePartitionMultiPhaseParallelIndexingTest(LockGranularity
lockGranularity, boolean useInputFormatApi)
+ private final int maxNumConcurrentSubTasks;
+
+ public RangePartitionMultiPhaseParallelIndexingTest(
+ LockGranularity lockGranularity,
+ boolean useInputFormatApi,
+ int maxNumConcurrentSubTasks
+ )
{
super(lockGranularity, useInputFormatApi);
+ this.maxNumConcurrentSubTasks = maxNumConcurrentSubTasks;
}
@Override
@@ -169,7 +177,8 @@ public class RangePartitionMultiPhaseParallelIndexingTest
extends AbstractMultiP
null,
DIM1,
false
- )
+ ),
+ maxNumConcurrentSubTasks
);
assertRangePartitions(publishedSegments);
}
@@ -362,7 +371,6 @@ public class RangePartitionMultiPhaseParallelIndexingTest
extends AbstractMultiP
}
}
-
private static class TestPartialGenericSegmentMergeParallelIndexTaskRunner
extends PartialGenericSegmentMergeParallelIndexTaskRunner
{
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
index b09634d..d82e212 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/common/task/batch/parallel/distribution/StringSketchTest.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.common.task.batch.parallel.distribution;
import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.datasketches.quantiles.ItemsSketch;
import org.apache.druid.jackson.JacksonModule;
import org.apache.druid.java.util.common.StringUtils;
@@ -69,6 +70,15 @@ public class StringSketchTest
target.put(MAX_STRING);
TestHelper.testSerializesDeserializes(OBJECT_MAPPER, target);
}
+
+ @Test
+ public void abidesEqualsContract()
+ {
+ EqualsVerifier.forClass(StringSketch.class)
+ .usingGetClass()
+ .withNonnullFields("delegate")
+ .verify();
+ }
}
public static class PutTest
diff --git a/pom.xml b/pom.xml
index e06bca7..93664fe 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1201,7 +1201,7 @@
<dependency>
<groupId>nl.jqno.equalsverifier</groupId>
<artifactId>equalsverifier</artifactId>
- <version>3.1.10</version>
+ <version>3.1.11</version>
<scope>test</scope>
</dependency>
<dependency>
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]